Calculate a 24-hour block-by-block moving average of the ETH-USDC pair on Uniswap
Suppose the user wants to calculate the 24-hour (~7200 blocks) moving average for the ETH-USDC pair on Uniswap, it can be done following these steps
Using the filtered collections,
filtered_receipts_logs
andtoken0_prices
, from the previous example as input streams. We assume that the block numbers were mixed into the input stream@input("ETHEREUM_RECEIPTS_LOGS")
using themap
operator for future use.Merge the two streams into a collection of tuples
(log, price)
using the zip operator.Transform the logs into tuples
(block_number, price)
using themap
operator.Perform the following steps in parallel:
Sum the prices of all swaps in each block.
Count the number of swaps in each block.
Combine the two outputs of the previous step into a collection of tuples using the zip operator.
Use the
map
operator to transform the tuple (sum, count) into average values (i.e., divide sum by count).Use the custom operator
arrange_by_key
to index the average prices by block number.Use the custom operator
moving_avg
, which can be applied to the arrangement collection, to compute the average value for the "largest" 7200 elements with ablock_window_size
in the arrangement (i.e., average values for the last 7200 blocks).
Below is the code for the dataflow that performs the described steps:
The batch_by_key
operator informs the compiler that the keys (in our case, log.block_number
) obtained using the map
operator will be clustered together in the underlying stream of the collection. This allows the compiler to choose a more efficient implementation for subsequent operators. In this case, all logs within the same block will be grouped together in the stream, appearing consecutively. The batch_by_key
operator utilizes an iterative context, which avoids indexing (transforming into an arrangement collection) in each of the following two parallel operations.
To build an arrangement collection, we need to order the average price values by block number. For this purpose, we can use the built-in operator arrange_by_key
. Internally, it constructs a Merkle tree (not a multiset hash) of the delta average prices and orders them by block number (key), providing efficient "random access" to historical updates using membership proofs. Then, we use the consolidate operator to instruct the data stream to always combine delta average prices for identical keys.
This scheme can be reused by any number of operators. To demonstrate this, the user also calculates a 1-hour (~300 blocks) moving average using the same ordered collection:
In the custom operator moving_avg
the user specifies that the arranged_items
in the input stream are an ordered collection using the keyword arranged
. Ordered collections allow the operator to perform key-based search and iterate over the values in the collection. The keyword single
in the output
declaration indicates that the output
is a collection that should always contain a single element. This is syntactic sugar that, in combination with the sum
operator, informs the compiler that each subsequent element in the input stream collection should result in removing the old value of the single-element collection, current_moving_avg
, and adding a new value where the sum of the values gradually accumulates. The operator also uses a generic type parameter, indicating that it can operate on any ordered collection of elements of any type "T" for which division and addition operations are well-defined.
Last updated