Transforming Ethereum data with dbt

8 minute read

In a previous post, we explored how to extract data from Ethereum using Airbyte. Now that we have our raw data sitting in a database, it’s time to start transforming it to meet our specific needs. In this particular case, we’ll be using dbt for these transformations.

If you’re new to dbt, this is a great starting point:

By following these steps, we are on our way to unlock the potential of dbt. Let’s dive in and see what we can do!

Creating our first dbt project

To streamline our deployment process in the future, we’ll be making use of Docker. This will allow us to easily manage our dbt environment and ensure that everything is running smoothly. Here’s how we’ll get started:

First, we’ll need to pull the correct dbt image by running the following command:

docker pull ghcr.io/dbt-labs/dbt-postgres

Once that’s done, we’ll need to create a skeleton to work with. To do this, we’ll run the Docker equivalent of the dbt init <project_name> command:

mkdir dbt_projects
mkdir dbt_projects/profiles
cd dbt_projects
docker run -it \
--platform linux/amd64 \
--network=host \
--mount type=bind,source=/Users/leodonethat/dbt_projects,target=/usr/app \
--mount type=bind,source=/Users/leodonethat/dbt_projects/profiles/,target=/root/.dbt/ \
ghcr.io/dbt-labs/dbt-postgres \
init dbt_ethereum
10:47:01  Running with dbt=1.2.3
Which database would you like to use?
[1] postgres

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
10:47:03  Profile dbt_ethereum written to /root/.dbt/profiles.yml using target's sample configuration. Once updated, you'll be able to start developing with dbt.
10:47:03
Your new dbt project "dbt_ethereum" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

  https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

  https://community.getdbt.com/

Happy modeling!

dbt_profiles_yml

Next, we’ll update the profiles.yml file with the credentials for our local Postgres database.

dbt-ethereum:
  target: dev
  outputs:
    dev:
      type: postgres
      threads: 1
      host: 127.0.0.1
      port: 5432
      user: postgres
      pass: postgres
      dbname: eth
      schema: dbt

Finally, we’ll want to take a look at the dbt_project.yml file in the newly created dbt_ethereum directory. This file can be updated to suit our specific needs and requirements.

dbt_project_yml

It’s worth noting that there are two example models included in the directory: my_first_dbt_model.sql and my_second_dbt_model.sql. While they don’t do much on their own, they’re a great way to ensure that we’re heading in the right direction and that everything is working as expected.

Running our dbt project

The next step is to try the docker equivalent of dbt run (for our newly created project):

docker run -it \
--platform linux/amd64 \
--network=host \
--mount type=bind,source=/Users/leodonethat/dbt_projects,target=/usr/app \
--mount type=bind,source=/Users/leodonethat/dbt_projects/profiles/,target=/root/.dbt/ \
ghcr.io/dbt-labs/dbt-postgres \
run --project-dir /usr/app/dbt/dbt_ethereum
11:18:06  Running with dbt=1.2.3
11:18:06  Partial parse save file not found. Starting full parse.
11:18:10  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 256 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
11:18:10
11:18:11  Concurrency: 1 threads (target='dev')
11:18:11
11:18:11  1 of 2 START table model dbt.my_first_dbt_model ................................ [RUN]
11:18:11  1 of 2 OK created table model dbt.my_first_dbt_model ........................... [SELECT 2 in 0.58s]
11:18:11  2 of 2 START table model dbt.my_second_dbt_model ............................... [RUN]
11:18:12  2 of 2 OK created table model dbt.my_second_dbt_model .......................... [SELECT 1 in 0.23s]
11:18:12
11:18:12  Finished running 2 table models in 0 hours 0 minutes and 1.75 seconds (1.75s).
11:18:12
11:18:12  Completed successfully
11:18:12
11:18:12  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

Everything looks good!

If we go to postico to see what is in our postgres database we will two new tables in the dbt schema:

dbt_postico_schema_png


This is fantastic news! Our basic setup has been successful, and we can now move on to tackling more complex tasks.

Updating the logic

Our next task is transforming the raw data we extracted with Airbyte, and this is where dbt really shines!

Understanding data from Airbyte

Our first step is taking a look at the data we get from Airbyte

dbt__airbyte_raw_ethereum_blocks

Each row has only three columns, where the field that really interests us is _airbyte_data. It’s a json string with the result of the eth_getBlockByNumber api call.

select _airbyte_data
from _airbyte_raw_ethereum_blocks
order by _airbyte_emitted_at asc
limit 1

{
  "hash": "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
  "size": "0x21c",
  "miner": "0x0000000000000000000000000000000000000000",
  "nonce": "0x0000000000000042",
  "number": "0x0",
  "uncles": [
  ],
  "gasUsed": "0x0",
  "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
  "gasLimit": "0x1388",
  "extraData": "0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa",
  "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
  "stateRoot": "0xd7f8974fb5ac78d9ac099b9ad5018bedc2ce0a72dad1827a1709da30580f0544",
  "timestamp": "0x0",
  "difficulty": "0x400000000",
  "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
  "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
  "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
  "transactions": [
  ],
  "totalDifficulty": "0x400000000",
  "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
}

Transforming data from Airbyte

Let’s move on to the next step in our dbt project, which may be small, but is certainly significant! Our goal is to create a table called blocks from the json results in _airbyte_data. Additionally, we will be adding an extra field at the end that shows the number of transactions within each block.

It’s important to note that the operator -> returns a json object, while ->> returns a string.

To achieve this, we will create a new file called blocks.sql in the models directory.

with 
blocks as (
    select
        _airbyte_data ->> 'number' as number,
        _airbyte_data ->> 'hash' as hash,
        _airbyte_data ->> 'parentHash' as parentHash,
        _airbyte_data ->> 'nonce' as nonce,
        _airbyte_data ->> 'sha3Uncles' as sha3Uncles,
        _airbyte_data ->> 'logsBloom' as logsBloom,
        _airbyte_data ->> 'transactionsRoot' as transactionsRoot,
        _airbyte_data ->> 'stateRoot' as stateRoot,
        _airbyte_data ->> 'receiptsRoot' as receiptsRoot,
        _airbyte_data ->> 'miner' as miner,
        _airbyte_data ->> 'difficulty' as difficulty,
        _airbyte_data ->> 'totalDifficulty' as totalDifficulty,
        _airbyte_data ->> 'extraData' as extraData,
        _airbyte_data ->> 'size' as size,
        _airbyte_data ->> 'gasLimit' as gasLimit,
        _airbyte_data ->> 'gasUsed' as gasUsed,
        _airbyte_data ->> 'mixHash' as mixHash,
        _airbyte_data ->> 'timestamp' as timestamp,
        _airbyte_data ->> 'uncles' as uncles,
        _airbyte_data ->> 'transactions' as transactions,
        json_array_length((_airbyte_data ->> 'transactions')::json) as numberTransactions
    from
        _airbyte_raw_ethereum_blocks
)
select
    *
from
    blocks

Once we run dbt again, the dbt.blocks table should now be available in our Postgres database!

In order to keep the momentum going, our next step will be to add a new model called transactions. This model will involve unpacking the data that we previously extracted and transformed in the blocks table.

with 
blocks_transactions as (
    select
        json_array_elements(transactions::json) as transaction
    from
        {{ ref('blocks') }}
),
transactions as (
    select
        transaction ->> 'r' as r,
        transaction ->> 's' as s,
        transaction ->> 'v' as v,
        transaction ->> 'to' as to,
        transaction ->> 'gas' as gas,
        transaction ->> 'from' as from,
        transaction ->> 'hash' as hash,
        transaction ->> 'type' as type,
        transaction ->> 'input' as input,
        transaction ->> 'nonce' as nonce,
        transaction ->> 'value' as value,
        transaction ->> 'chainId' as chainId,
        transaction ->> 'gasPrice' as gasPrice,
        transaction ->> 'blockHash' as blockHash,
        transaction ->> 'blockNumber' as blockNumber,
        transaction ->> 'transactionIndex' as transactionIndex
    from
        blocks_transactions
)
select
    *
from
    transactions

This particular step introduces two new pieces of information. Firstly, we will be using the json_array_elements function which is capable of breaking down each element within a json array into its own individual row. Additionally, we will also be referencing the blocks model, which is our first instance of dbt syntax.

To proceed, we will delete any existing example models and initiate a full refresh of our dbt project. We can use the same docker command as before, but with a modified final part:

run --full-refresh --project-dir /usr/app/dbt/dbt_ethereum
06:44:57  Running with dbt=1.2.3
06:44:57  Found 2 models, 6 tests, 0 snapshots, 0 analyses, 256 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
06:44:57
06:44:58  Concurrency: 1 threads (target='dev')
06:44:58
06:44:58  1 of 2 START table model dbt.blocks ............................................ [RUN]
06:44:59  1 of 2 OK created table model dbt.blocks ....................................... [SELECT 10 in 0.69s]
06:44:59  2 of 2 START table model dbt.transactions ...................................... [RUN]
06:44:59  2 of 2 OK created table model dbt.transactions ................................. [SELECT 499 in 0.40s]
06:44:59
06:44:59  Finished running 2 table models in 0 hours 0 minutes and 1.88 seconds (1.88s).
06:44:59
06:44:59  Completed successfully
06:44:59
06:44:59  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

Writing our first data-shaping transformation

We now have a neatly organized table containing all the blockchain transactions. With this data, we can explore various avenues of analysis and let our creativity run wild.

To take the first step in that direction, our next task will be to create a transformation that can calculate daily aggregations for the number of transactions, as well as their total value.

It’s important to note that we will be using a custom user-defined function called hex_to_num to simplify conversions. Although this function could have been used in previous transformations, we will keep the code as straightforward as possible for the purpose of this proof of concept.

select
    date(date_trunc('day', to_timestamp(hex_to_num(blk.timestamp)))) as date_ymd,
    count(distinct tx.hash) as number_transactions,
    count(distinct tx.blockNumber) as number_blocks,
    sum(hex_to_num(value)) as total_value_wei,
    sum(hex_to_num(value) * (1/pow(10,18))) as total_value_eth
from
    {{ ref('transactions') }}  as tx
    left join {{ ref('blocks') }} as blk on (tx.blocknumber = blk.number)
group by
    date_ymd

To proceed, we will run the docker version of dbt run once more. Once the process is complete, we can take a quick glance at our database by running a simple query.

select *
from dbt.transactions_value_daily
order by date_ymd asc

dbt_postico_value_daily

Fantastic! 🚀

We can now easily visualize the table with the required data. We have multiple options to access this table, such as using JupyterHub for data analysis via notebooks or using Metabase, a visualization tool, to create interactive dashboards.

It’s important to note that for this proof of concept, we assumed the underlying data is static and processed all of it at once. This approach is suitable for analytical purposes when we want to study past behavior. However, it doesn’t provide the complete picture of a blockchain. To productionize this pipeline, we need to process new blocks efficiently. This involves setting up Airbyte to extract blocks from where it previously stopped and updating our dbt models to process new data incrementally.



Source Code