Skip to content

Cloudflare Queues

rindexer allows you to stream blockchain events to Cloudflare Queues, enabling realtime processing of blockchain data in your Cloudflare Workers. This integration provides guaranteed message delivery, global distribution, and seamless integration with your Cloudflare-based backend infrastructure.

This goes under the contracts or native_transfers section of the YAML configuration file.

Configuration with rindexer

Cloudflare Queues configuration requires your Cloudflare API token, account ID, and queue definitions.

Example

contract events
name: RocketPoolETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: no-code
networks:
- name: ethereum
  chain_id: 1
  rpc: https://mainnet.gateway.tenderly.co
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks: 
            - ethereum
          events: 
            - event_name: Transfer
              alias: RocketPoolTransfer

Message Format

The message sent to your Cloudflare Queue is already decoded and parsed into a JSON object with a message_id field added for tracking.

  • message_id - Unique identifier for this message from rindexer
  • event_name - The name of the event
  • event_signature_hash - The event signature hash (keccak256 hash of the event signature)
  • body > event_data - Array of decoded event data with transaction information
  • network - The network the event was emitted on

For example a transfer event would look like:

{
    "message_id": "rindexer_stream__blockchain-transfers-transfer-chunk-0",
    "event_name": "Transfer",
    "event_signature_hash": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
    "body": {
        "event_data": [
            {
                "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
                "to": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
                "value": "1000000000000000000",
                "tx_information": {
                    "address": "0xae78736cd615f374d3085123a210448e74fc6393",
                    "block_hash": "0x8461da7a1d4b47190a01fa6eae219be40aacffab0dd64af7259b2d404572c3d9",
                    "block_number": "18718011",
                    "log_index": "0",
                    "network": "ethereum",
                    "transaction_hash": "0x145c6705ffbf461e85d08b4a7f5850d6b52a7364d93a057722ca1194034f3ba4",
                    "transaction_index": "0"
                }
            }
        ]
    },
    "network": "ethereum"
}

Cloudflare Worker Consumer

Here's an example of a Cloudflare Worker that consumes messages from your queues:

export default {
  async queue(batch, env, ctx) {
    for (const message of batch.messages) {
      try {
        for (const eventInfo of message.body.event_data) {
            console.log(eventInfo);
            // Result:
            // {
            //   "from": "0xf081470f5c6fbccf48cc4e5b82dd926409dcdd67",
            //   "to": "0x58bd88f0c826bdc2d8adaf66abb66bb99d961a3d",
            //   "transaction_information": {
            //     "address": "0xae78736cd615f374d3085123a210448e74fc6393",
            //     "block_hash": "0x58d7fd9aab0a4023f812dd0919d70f63ffa9a92ee26d83d34b04fbb000b74e9b",
            //     "block_timestamp": "0x6567a397",
            //     "log_index": "0x1b0",
            //     "network": "ethereum",
            //     "transaction_hash": "0x7cef018bc6090ac7d5a73d6ffe1975c6d52353cad315de9b7c771db6648c7a44",
            //     "block_number": 18679752,
            //     "chain_id": 1,
            //     "transaction_index": 148
            //   },
            //   "value": "45336822342319436"
            // }
        }
 
        message.ack();
      } catch (error) {
        console.error('Error processing message:', error);
        message.retry();
      }
    }
  }
};

YAML Config

api_token

Your Cloudflare API token with permissions to access Queues API.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}

account_id

Your Cloudflare Account ID where your queues are created.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}

queues

This is an array allowing you to configure multiple queues with different settings.

queue_id

The ID of the Cloudflare Queue to send messages to. This queue must already exist in your Cloudflare account. You can find the queue ID in your Cloudflare dashboard or via the Queues API.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers

networks

This is an array of networks you want to stream to this queue.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks: 
            - ethereum

events

This is an array of events you want to stream to this queue.

event_name

This is the name of the event you want to stream to this queue, must match the ABI event name.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
alias

This is an optional alias you wish to assign to the event you want published to this Queue.

It is paired with the event name and allows consumers to have unique discriminator keys in the event of naming conflicts. E.g Transfer (ERC20) and Transfer (ERC721).

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              alias: RocketPoolTransfer

conditions

This accepts an array of conditions you want to apply to the event data before sending to the queue.

You may want to filter on the stream based on the event data, if the event data has not got an index on the on the solidity event you can not filter it over the logs. The conditions filter is here to help you with this, based on your ABI you can filter on the event data.

rindexer has enabled a special syntax which allows you to define on your ABI fields what you want to filter on.

  1. > - higher then (for numbers only)
  2. < - lower then (for numbers only)
  3. = - equals
  4. >= - higher then or equals (for numbers only)
  5. <= - lower then or equals (for numbers only)
  6. || - or
  7. && - and

So lets look at an example lets say i only want to get transfer events which are higher then 2000000000000000000 RETH wei

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "value": ">=2000000000000000000"

We use the ABI input name value to filter on the value field, you can find these names in the ABI file.

{
    "anonymous":false,
    "inputs":[
      {
        "indexed":true,
        "internalType":"address",
        "name":"from",
        "type":"address"
      },
      {
        "indexed":true,
        "internalType":"address",
        "name":"to",
        "type":"address"
      },
      {
        "indexed":false,
        "internalType":"uint256",
        "name":"value", 
        "type":"uint256"
      }
    ],
    "name":"Transfer",
    "type":"event"
}

You can use the || or && to combine conditions.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "value": ">=2000000000000000000 && value <=4000000000000000000"

You can use the = to filter on other aspects like the from or to address.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662 || 0x0338ce5020c447f7e668dc2ef778025ce398266u"
                - "value": ">=2000000000000000000 || value <=4000000000000000000"

If you have a tuple and you want to get that value you just use the object notation.

For example lets say we want to only get the events for profileId from the quoteParams tuple which equals 1:

{
     "anonymous": false,
     "inputs": [
       {
         "components": [
           {
             "internalType": "uint256",
             "name": "profileId", 
             "type": "uint256"
           },
           ...
         ],
         "indexed": false,
         "internalType": "struct Types.QuoteParams",
         "name": "quoteParams", 
         "type": "tuple"
       },
       ...
     ],
     "name": "QuoteCreated", 
     "type": "event"
}
rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    cloudflare_queues: 
      api_token: ${CLOUDFLARE_API_TOKEN}
      account_id: ${CLOUDFLARE_ACCOUNT_ID}
      queues: 
        - queue_id: blockchain-transfers
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "quoteParams.profileId": "=1"