Domain Specific Language

Note: none of this is actually implemented yet. The purpose of this page is, at the moment, to demonstrate how we anticipicate developers will (eventually) use Maru once it becomes a general computational platform. Most, if not all of this will likely change in the future.

Maru developers will be able to program their operators with Jutsu (name tentative), our DSL. Jutsu compiles down to a set of STARK-VM binaries (one for each operator) and a description file that specifies their inputs, how to chain them together, and how to graft them onto the existing dataflow graph. These bytecodes and description files will then be uploaded to an "operator registry", from which the protocol will update its view of the dataflow and execute it.

Here are some examples of how we expect Jutsu programs to look:

Unary map

Let's say the user wanted to write an operator that takes a collection of ratios expressed as 64-bit fixed-point decimal numbers (expressed as u64s) and turns them into percentages. They can define a function and then use the map operator to convert the prices.

// They might write it like this
fn to_pct(ratios: u64): u64 {
    return ratios * 100
}

// or like this
fn to_pct = (ratios: u64) => ratios * 100

Binary map

Let's say the user wanted to write an operator that takes a collection of NFT prices in wei and converts them to USDC given the current exchange rate (a single-element collection). They can "merge" the two collections into a single collection of tuples using the zip operator. Depending on what the two streams in question are, zip may be implemented differently under the hood by the compiler. In this case, the zip can be elided away completely and pushed as an input directly into the underlying VM of the map operator.

// They might write it like this:
fn nft_price_to_eth((meta, price): (NFTMeta, u256), exchange_rate: u256): u256 {
    let converted_price = price * exchange_rate;
    (meta, converted_price)
}

// or like this 
fn nft_price_to_eth((meta, price): (NFTMeta, u256), exchange_rate: u256): u256 => (meta, price * exchange_rate)

// then, they can build the dataflow and `export` the output collection:
export collection nfts_priced_in_eth = @input("PRICE_STREAM_ID")
    .zip("EXCHANGE_RATE_STREAM_ID")
    .map(nft_price_to_eth)

Filter for all of the prices for ETH-USDC swaps on Uniswap

Let's say the user wanted to write an operator that produces a collection of prices for every ETH-USDC swap on Uniswap given a collection of every Ethereum log ever. They can do this using the filter operator.

const USDC_ETH_SWAP_LOG_SIGNATURE = "CONTRACT_LOG_SIGNATURE"

export collection filtered_logs = @input("ETHEREUM_LOGS")
    .filter(log => log.contract_address == USDC_ETH_SWAP_LOG_SIGNATURE)

export collection prices = filtered_logs.map(log => log.price)

Compute a 24-hour block-by-block moving average of the ETH-USDC pair on Uniswap

Let's say the user wants to compute a 24-hour (~7200 Blocks) moving average over the ETH-USDC pair on uniswap. They can do this with the following steps:

  1. use the ETH-USDC swap logs from the previous example as input

  2. map the logs to (block_number, price) tuples

  3. in parallel, do the following:

    • sum the prices for all of the swaps in each block

    • count the number of swaps in each block

  4. zip two the outputs of the previous step together

  5. map the (sum, count) pairs to averages (i.e. divide them)

  6. use an arrangement to index the averages by block number

  7. use a custom moving_avg operator that takes in an arrangement and computes an average over the "greatest" 7200 window_len elements of in the arrangement (i.e. the averages for the 7200 most recent blocks)

// Map logs to (block_number, price) tuples, clustered by block number
collection batched_logs = @input("ETH_USDC_SWAP_LOGS")
    .map(log => (log.block_number, log.price))
    .batch_by_key()

collection volumes = batched_logs.sum_by_key()
collection counts = batched_logs.count_by_key()

// Arrange averages by block number
arrangement block_averages = volumes
    .zip(counts)
    .map(((block_num, volume), (_, count)) => (block_num, volume / count))
    .arrange_by_key()
    .consolidate()

export collection moving_avg_24hr = block_averages.moving_avg(7200)
export collection moving_avg_1hr = block_averages.moving_avg(300)

operator moving_avg<T: Div<T> + Add<T>>(window_len: usize) {
    input arranged items: T
    output single current_window = arranged_items
        .iter_descending()
        .take(window_len)
        .map(block_avg => block_avg / window_len)
        .sum()
}

The batch_by_key operator tells the compiler that the keys produced by the map function will be clustered in the stream underlying the collection. This allows it to choose a more efficient implementation for subsequent operators. In this case, all logs from the block will appear adjacent to each other in the stream since they're in the same block. The naive sum_by_key version will build an arrangement (expensive) - the batch_by_key version will use an iteration context instead, which is much cheaper here.

For the arrangement, the user needs to arrange the averages by block number. They can use the built-in arrange_by_key operator for that. Internally, it builds a merkle tree (not a multiset hash) of the deltas and orders them by key, allowing efficient "random access" over historical updates through membership proofs. Then we use the consolidate operator to tell the dataflow to always consolidate deltas for the same keys. This arrangement can be re-used by any number of operators — to demonstrate this, the user also computes a 1-hour (~300 blocks) moving average using the same arrangement.

Then they can define a custom moving_avg operator by dropping down to the lower-level differential interface, which requires them to specify three things: input declarations, output declarations, and the logic itself. The logic is expressed the same way you'd define the program itself - conceptually this can be thought of as a sort of "sub-dataflow". In the moving_avg operator, the user specifies that the items input is an arranged collection using the arranged keyword. Arranged collections allow the operator to perform key-value lookups and iterate over the collection's values. The single keyword on the output declaration signifies that the output is a collection that should always contain a single element. This is syntactic sugar for saying that each input delta should result in "removing the old window and adding the new window". The operator also uses a generic type parameter, signifying that it can build windows over any arranged collection of elements of any type T such that division and addition are well-defined for T.

Compiler Optimizations

The Jutsu compiler could be smart enough to consolidate operators expressed semantically into significantly more efficient concrete implementations, given what information it has about them. For instance, in the implementation for moving_avg above, instead of writing

.fold(moving_avg, (moving_avg, block_avg) => moving_avg + block_avg / window_len)

we write

.map(block_avg => block_avg / window_len)
.sum()

While the programmer logically expressed more operators, the compiler can assume addition is commutative, so it can produce a differential operator that exploits this fact, avoiding the need to iterate altogether (subtract the quotient for the previous window's oldest value and add the quotient for the new one). In contrast, the compiler can't assume the function given to fold is commutative.

Last updated