IoTDB provides multiple built-in functions and user-defined functions (UDFs) to meet users' computing requirements.
Table 1 lists the UDF types supported by IoTDB.
To write a UDTF, you need to inherit the org.apache.iotdb.db.query.udf.api.UDTF class and implement at least the beforeStart method and one transform method.
Table 2 describes all interfaces that can be implemented by users.
Interface Definition |
Description |
Mandatory |
---|---|---|
void validate(UDFParameterValidator validator) throws Exception |
This method is used to validate UDFParameters and is executed before beforeStart is called. |
No |
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception |
This is an initialization method used to call the user-defined initialization behavior before the UDTF processes the input data. Each time a user executes a UDTF query, the framework constructs a new UDF instance, and this method is called. It is called only once in the lifecycle of each UDF instance. |
Yes |
void transform(Row row, PointCollector collector) throws Exception |
This method is called by the framework. When you choose to use the RowByRowAccessStrategy strategy in beforeStart to consume raw data, this data processing method is called. The input data is passed in by Row, and the result is output by PointCollector. You need to call the data collection method provided by collector in this method to determine the output data. |
Use either this method or transform(RowWindow rowWindow, PointCollector collector). |
void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
This method is called by the framework. When you choose to use the SlidingSizeWindowAccessStrategy or SlidingTimeWindowAccessStrategy strategy in beforeStart to consume raw data, this data processing method will be called. The input data is passed in by RowWindow, and the result is output by PointCollector. You need to call the data collection method provided by collector in this method to determine the output data. |
Use either this method or transform(Row row, PointCollector collector). |
void terminate(PointCollector collector) throws Exception |
This method is called by the framework. This method is called after all transform calls have been executed and before beforeDestory is called. In a single UDF query, this method will be called only once. You need to call the data collection method provided by collector in this method to determine the output data. |
No |
void beforeDestroy() |
This method is called by the framework after the last input data is processed, and will be called only once in the lifecycle of each UDF instance. |
No |
Calling sequence of each method:
Each time the framework executes a UDTF query, a new UDF instance will be constructed. When the query ends, this UDF instance will be destroyed. Therefore, the internal data of the instances in different UDTF queries (even in the same SQL statement) is isolated. You can maintain some state data in the UDTF without considering the impact of concurrency and other factors.
Interface usage:
The validate method is used to validate the parameters entered by users.
In this method, you can limit the number and types of input time series, check the attributes of user input, or perform any custom logic verification.
Using this method, you can do the following things:
UDFParameters is used to parse the UDF parameters in SQL statements (the part in the parentheses following the UDF name in the SQL statements). The parameters include two parts. The first part is the path and its data type of the time series to be processed by the UDF. The second part is the key-value pair attributes for customization.
Example:
SELECT UDF(s1, s2, 'key1'='iotdb', 'key2'='123.45') FROM root.sg.d;
Usage:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters for (PartialPath path : parameters.getPaths()) { TSDataType dataType = parameters.getDataType(path); // do something } String stringValue = parameters.getString("key1"); // iotdb Float floatValue = parameters.getFloat("key2"); // 123.45 Double doubleValue = parameters.getDouble("key3"); // null int intValue = parameters.getIntOrDefault("key4", 678); // 678 // do something // configurations // ... }
You can use UDTFConfigurations to specify the strategy used by the UDF to access raw data and the type of the output time series.
Usage:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters // ... // configurations configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(TSDataType.INT32); }
The setAccessStrategy method is used to set the strategy used by the UDF to access raw data. The setOutputDataType method is used to set the data type of the output time series.
Note that the raw data access strategy you set here determines which transform method the framework will call. Implement the transform method corresponding to the raw data access strategy. You can also dynamically decide which strategy to set based on the attribute parameters parsed by UDFParameters. Therefore, the two transform methods are also allowed to be implemented in one UDF.
The following are the strategies you can set.
Interface Definition |
Description |
transform Method to Call |
---|---|---|
RowByRowAccessStrategy |
Processes raw data row by row. The framework calls the transform method once for each row of raw data input. When a UDF has only one input time series, a row of input is a data point in the input time series. When a UDF has multiple input time series, a row of input is a result record of the raw query (aligned by time) on these input time series. (In a row, there may be a column with a value of null, but not all of them are null.) |
void transform(Row row, PointCollector collector) throws Exception |
SlidingTimeWindowAccessStrategy |
Processes a batch of data in a fixed time interval each time. A data batch is called a window. The framework calls the transform method once for each raw data input window. A window may contain multiple rows of data. Each row of data is a result record of the raw query (aligned by time) on these input time series. (In a row, there may be a column with a value of null, but not all of them are null.) |
void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
SlidingSizeWindowAccessStrategy |
Processes raw data batch by batch, and each batch contains a fixed number of raw data rows (except the last batch). A data batch is called a window. The framework calls the transform method once for each raw data input window. A window may contain multiple rows of data. Each row of data is a result record of the raw query (aligned by time) on these input time series. (In a row, there may be a column with a value of null, but not all of them are null.) |
void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
The construction of RowByRowAccessStrategy does not require any parameters.
The display window on the time axis is optional. If these parameters are not provided, the start time of the display window will be set to the same as the minimum timestamp of the query result set, and the end time of the display window will be set to the same as the maximum timestamp of the query result set.
The sliding step parameter is also optional. If the parameter is not provided, the sliding step will be set to the same as the time interval for dividing the time axis.
The following figure shows the relationship between the three types of parameters.
Note that the actual time interval of some of the last time windows may be less than the specified time interval parameter. In addition, the number of data rows in some time windows may be 0. In this case, the framework will also call the transform method for the empty windows.
The sliding step parameter is optional. If this parameter is not provided, the sliding step will be set to the same as the window size.
Note that the type of output time series you set here determines the type of data that PointCollector in the transform method can actually receive. The relationship between the output data type set in setOutputDataType and the actual data output type that PointCollector can receive is as follows.
Output Data Type Set in setOutputDataType |
Data Type That PointCollector Can Receive |
---|---|
INT32 |
int |
INT64 |
long |
FLOAT |
float |
DOUBLE |
double |
BOOLEAN |
boolean |
TEXT |
java.lang.String and org.apache.iotdb.tsfile.utils.Binary |
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // do something // ... configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(parameters.getDataType(0)); }
You need to implement this method when you specify the strategy for the UDF to read raw data as RowByRowAccessStrategy in beforeStart.
This method processes one row of raw data at a time. The raw data is input from Row and output by PointCollector. You can choose to output any number of data points in one transform call. Note that the type of the output data points must be the same as you set in the beforeStart method, and the timestamp of the output data points must be strictly monotonically increasing.
The following is a complete UDF example that implements the void transform(Row row, PointCollector collector) throws Exception method. It is an adder that receives two columns of time series as input. When two data points in a row are not null, this UDF will output the algebraic sum of these two data points.
import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.Row; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Adder implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT64) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) throws Exception { if (row.isNull(0) || row.isNull(1)) { return; } collector.putLong(row.getTime(), row.getLong(0) + row.getLong(1)); } }
You need to implement this method when you specify the strategy for the UDF to read raw data as SlidingTimeWindowAccessStrategy or SlidingSizeWindowAccessStrategy.
This method processes a batch of data in a fixed number of rows or a fixed time interval each time, and the container containing this batch of data is called a window. The raw data is input from RowWindow and output by PointCollector. RowWindow can help you access a batch of rows, and it provides a set of interfaces for random access and iterative access to this batch of rows. You can choose to output any number of data points in one transform call. Note that the type of output data points must be the same as you set in the beforeStart method, and the timestamps of output data points must be strictly monotonically increasing.
The following is a complete UDF example that implements the void transform(RowWindow rowWindow, PointCollector collector) throws Exception method. It is a counter that receives any number of time series as input, and its function is to count and output the number of data rows in each time window within a specified time range.
import java.io.IOException; import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.RowWindow; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Counter implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT32) .setAccessStrategy(new SlidingTimeWindowAccessStrategy( parameters.getLong("time_interval"), parameters.getLong("sliding_step"), parameters.getLong("display_window_begin"), parameters.getLong("display_window_end"))); } @Override public void transform(RowWindow rowWindow, PointCollector collector) throws Exception { if (rowWindow.windowSize() != 0) { collector.putInt(rowWindow.getRow(0).getTime(), rowWindow.windowSize()); } } }
In some scenarios, a UDF needs to traverse all the raw data to calculate the final output data points. The terminate interface provides support for those scenarios.
This method is called after all transform calls have been executed and before beforeDestory is called. You can implement the transform method to perform pure data processing, and implement the terminate method to output the processing results.
The processing results need to be output by PointCollector. You can choose to output any number of data points in one terminate call. Note that the type of the output data points must be the same as you set in the beforeStart method, and the timestamp of the output data points must be strictly monotonically increasing.
The following is a complete UDF example that implements the void terminate(PointCollector collector) throws Exception method. It takes one time series whose data type is INT32 as input, and outputs the maximum value point of the series.
import java.io.IOException; import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.Row; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Max implements UDTF { private Long time; private int value; @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT32) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) { int candidateValue = row.getInt(0); if (time == null || value < candidateValue) { time = row.getTime(); value = candidateValue; } } @Override public void terminate(PointCollector collector) throws IOException { if (time != null) { collector.putInt(time, value); } } }