Cloudflare Queues
rindexer allows you to stream blockchain events to Cloudflare Queues, enabling realtime processing of blockchain data in your Cloudflare Workers. This integration provides guaranteed message delivery, global distribution, and seamless integration with your Cloudflare-based backend infrastructure.
This goes under the contracts or native_transfers section of the YAML configuration file.
Configuration with rindexer
Cloudflare Queues configuration requires your Cloudflare API token, account ID, and queue definitions.
Example
name: RocketPoolETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: no-code
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
alias: RocketPoolTransfer
Message Format
The message sent to your Cloudflare Queue is already decoded and parsed into a JSON object with a message_id
field added for tracking.
message_id
- Unique identifier for this message from rindexerevent_name
- The name of the eventevent_signature_hash
- The event signature hash (keccak256 hash of the event signature)body
>event_data
- Array of decoded event data with transaction informationnetwork
- The network the event was emitted on
For example a transfer event would look like:
{
"message_id": "rindexer_stream__blockchain-transfers-transfer-chunk-0",
"event_name": "Transfer",
"event_signature_hash": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"body": {
"event_data": [
{
"from": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
"to": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
"value": "1000000000000000000",
"tx_information": {
"address": "0xae78736cd615f374d3085123a210448e74fc6393",
"block_hash": "0x8461da7a1d4b47190a01fa6eae219be40aacffab0dd64af7259b2d404572c3d9",
"block_number": "18718011",
"log_index": "0",
"network": "ethereum",
"transaction_hash": "0x145c6705ffbf461e85d08b4a7f5850d6b52a7364d93a057722ca1194034f3ba4",
"transaction_index": "0"
}
}
]
},
"network": "ethereum"
}
Cloudflare Worker Consumer
Here's an example of a Cloudflare Worker that consumes messages from your queues:
export default {
async queue(batch, env, ctx) {
for (const message of batch.messages) {
try {
for (const eventInfo of message.body.event_data) {
console.log(eventInfo);
// Result:
// {
// "from": "0xf081470f5c6fbccf48cc4e5b82dd926409dcdd67",
// "to": "0x58bd88f0c826bdc2d8adaf66abb66bb99d961a3d",
// "transaction_information": {
// "address": "0xae78736cd615f374d3085123a210448e74fc6393",
// "block_hash": "0x58d7fd9aab0a4023f812dd0919d70f63ffa9a92ee26d83d34b04fbb000b74e9b",
// "block_timestamp": "0x6567a397",
// "log_index": "0x1b0",
// "network": "ethereum",
// "transaction_hash": "0x7cef018bc6090ac7d5a73d6ffe1975c6d52353cad315de9b7c771db6648c7a44",
// "block_number": 18679752,
// "chain_id": 1,
// "transaction_index": 148
// },
// "value": "45336822342319436"
// }
}
message.ack();
} catch (error) {
console.error('Error processing message:', error);
message.retry();
}
}
}
};
YAML Config
api_token
Your Cloudflare API token with permissions to access Queues API.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id
Your Cloudflare Account ID where your queues are created.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues
This is an array allowing you to configure multiple queues with different settings.
queue_id
The ID of the Cloudflare Queue to send messages to. This queue must already exist in your Cloudflare account. You can find the queue ID in your Cloudflare dashboard or via the Queues API.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks
This is an array of networks you want to stream to this queue.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events
This is an array of events you want to stream to this queue.
event_name
This is the name of the event you want to stream to this queue, must match the ABI event name.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
alias
This is an optional alias
you wish to assign to the event you want published to this Queue.
It is paired with the event name and allows consumers to have unique discriminator keys in the event of naming conflicts. E.g Transfer (ERC20) and Transfer (ERC721).
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
alias: RocketPoolTransfer
conditions
This accepts an array of conditions you want to apply to the event data before sending to the queue.
You may want to filter on the stream based on the event data, if the event data has not got an index on the on the
solidity event you can not filter it over the logs. The conditions
filter is here to help you with this,
based on your ABI you can filter on the event data.
rindexer has enabled a special syntax which allows you to define on your ABI fields what you want to filter on.
>
- higher then (for numbers only)<
- lower then (for numbers only)=
- equals>=
- higher then or equals (for numbers only)<=
- lower then or equals (for numbers only)||
- or&&
- and
So lets look at an example lets say i only want to get transfer events which are higher then 2000000000000000000
RETH wei
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000"
We use the ABI input name value
to filter on the value field, you can find these names in the ABI file.
{
"anonymous":false,
"inputs":[
{
"indexed":true,
"internalType":"address",
"name":"from",
"type":"address"
},
{
"indexed":true,
"internalType":"address",
"name":"to",
"type":"address"
},
{
"indexed":false,
"internalType":"uint256",
"name":"value",
"type":"uint256"
}
],
"name":"Transfer",
"type":"event"
}
You can use the ||
or &&
to combine conditions.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000 && value <=4000000000000000000"
You can use the =
to filter on other aspects like the from
or to
address.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662 || 0x0338ce5020c447f7e668dc2ef778025ce398266u"
- "value": ">=2000000000000000000 || value <=4000000000000000000"
If you have a tuple and you want to get that value you just use the object notation.
For example lets say we want to only get the events for profileId
from the quoteParams
tuple which equals 1
:
{
"anonymous": false,
"inputs": [
{
"components": [
{
"internalType": "uint256",
"name": "profileId",
"type": "uint256"
},
...
],
"indexed": false,
"internalType": "struct Types.QuoteParams",
"name": "quoteParams",
"type": "tuple"
},
...
],
"name": "QuoteCreated",
"type": "event"
}
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
cloudflare_queues:
api_token: ${CLOUDFLARE_API_TOKEN}
account_id: ${CLOUDFLARE_ACCOUNT_ID}
queues:
- queue_id: blockchain-transfers
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "quoteParams.profileId": "=1"