Indexers
When creating a new rust project with rindexer it will create you a indexers folder, this is where you will write your custom logic for the indexer. This is where you will do all your indexing logic, you can do anything you want in here, you can do http requests, on chain lookups, custom logic, custom DBs, anything you can think of. rindexer gives you the foundations and also baked in extendability. Rust enforces a strong type system, all logs will be streamed to you just focus on the logic you want.
By default if you turn storage postgres on in the YAML configuration file it will also create you postgres tables, also write SQL for you to use and expose you a postgres client. This is a great starting point for you to build on.
The tables creation can be skipped by using the disable_create_tables in the YAML configuration file.
If you also enable the CSV storage it will also generate code in the handler to write to that CSV files.
You can regenerate the indexers folder by running the following command:
rindexer codegen indexer
To help understand the interfaces and ways rindexer handlers can be extended we will look at an example.
Take this YAML file - All transfer events for reth on Ethereum between block 18600000 and 18718056 will be indexed.
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: rust
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
storage:
postgres:
enabled: true
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
This would generate you a rocket_pool_eth.rs
file in the indexers folder, this file will have a handler function,
note that the name of the file is the contract name in snake case alongside if you are doing filters _filter
appended to it.
If you are using multiple events to index on a contract the file it will generate will have all the handlers in the single file.
Handlers
As you see here with this example out the box it will generate you all your indexer handlers for in this case the Transfer event. If you have postgres storage enabled it will have the bulk insert code written for you. The boilerplate code is runnable out the box.
use super::super::super::typings::rust::events::rocket_pool_eth::{
no_extensions, ApprovalEvent, RocketPoolETHEventType, TransferEvent,
};
use rindexer::{
event::callback_registry::EventCallbackRegistry, rindexer_error, rindexer_info,
EthereumSqlTypeWrapper, PgType, RindexerColorize,
};
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
if results.is_empty() {
return Ok(());
}
let mut bulk_data: Vec<Vec<EthereumSqlTypeWrapper>> = vec![];
for result in results.iter() {
let data = vec![
EthereumSqlTypeWrapper::Address(result.tx_information.address),
EthereumSqlTypeWrapper::Address(result.event_data.from),
EthereumSqlTypeWrapper::Address(result.event_data.to),
EthereumSqlTypeWrapper::U256(result.event_data.value),
EthereumSqlTypeWrapper::H256(result.tx_information.transaction_hash),
EthereumSqlTypeWrapper::U64(result.tx_information.block_number),
EthereumSqlTypeWrapper::H256(result.tx_information.block_hash),
EthereumSqlTypeWrapper::String(result.tx_information.network.to_string()),
];
bulk_data.push(data);
}
if bulk_data.is_empty() {
return Ok(());
}
if bulk_data.len() > 100 {
let result = context
.database
.bulk_insert_via_copy(
"rust_rocket_pool_eth.transfer",
&[
"contract_address".to_string(),
"from".to_string(),
"to".to_string(),
"value".to_string(),
"tx_hash".to_string(),
"block_number".to_string(),
"block_hash".to_string(),
"network".to_string(),
],
&bulk_data
.first()
.ok_or("No first element in bulk data, impossible")?
.iter()
.map(|param| param.to_type())
.collect::<Vec<PgType>>(),
&bulk_data,
)
.await;
if let Err(e) = result {
rindexer_error!(
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
e
);
return Err(e.to_string());
}
} else {
let result = context
.database
.bulk_insert(
"rust_rocket_pool_eth.transfer",
&[
"contract_address".to_string(),
"from".to_string(),
"to".to_string(),
"value".to_string(),
"tx_hash".to_string(),
"block_number".to_string(),
"block_hash".to_string(),
"network".to_string(),
],
&bulk_data,
)
.await;
if let Err(e) = result {
rindexer_error!(
"RocketPoolETHEventType::Transfer inserting bulk data: {:?}",
e
);
return Err(e.to_string());
}
}
rindexer_info!(
"RocketPoolETH::Transfer - {} - {} events",
"INDEXED".green(),
results.len(),
);
Ok(())
},
no_extensions(),
)
.await,
)
.register(registry);
}
pub async fn rocket_pool_eth_handlers(registry: &mut EventCallbackRegistry) {
transfer_handler(registry).await;
}
Event::Handler
rindexer hides all the complex rust types and abstracts everything for you so you can easily just build the logic within the handler itself.
As you see with the below you just write the logic and results
holds all the decoded event data and context
holds the database client and any extensions you pass to it.
The naming convention for the handler is {AbiEventName}Event::handler
so in this case TransferEvent::handler
this is so you can.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// logic here
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
Why an async move?
rindexer has abstracted all the complex types for you so you can just focus on the logic you want to write, that said
rust demands knowing all the memory location of every element and when to drop references, this is why you need to use
async move
in the handler.
Results
This holds the event logs decoded information and the transaction information for the events.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
// results = Vec<TransferResult>
|results, context| async move {
// logic here
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
#[derive(Debug, Clone)]
pub struct TransferResult {
pub event_data: TransferData,
pub tx_information: TxInformation,
}
The event_data
will be pointing to the ABI type generated using ethers
#[derive(
Clone,
::ethers::contract::EthEvent,
::ethers::contract::EthDisplay,
Default,
Debug,
PartialEq,
Eq,
Hash,
)]
#[ethevent(name = "Transfer", abi = "Transfer(address,address,uint256)")]
pub struct TransferFilter {
#[ethevent(indexed)]
pub from: ::ethers::core::types::Address,
#[ethevent(indexed)]
pub to: ::ethers::core::types::Address,
pub value: ::ethers::core::types::U256,
}
The tx_information
will be the transaction related information for the event
#[derive(Debug, Clone)]
pub struct TxInformation {
pub network: String,
pub address: Address,
pub block_hash: H256,
pub block_number: U64,
pub block_timestamp: Option<U256>,
pub transaction_hash: H256,
pub log_index: U256,
pub transaction_index: U64,
}
As you see the network
is always passed in the tx_information
struct, this is so you can index multiple networks
within the same handler if you wish.
Context
The context
is a struct that is passed to the handler which has thread safe services exposed for ease of use
within the handler.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
// context = Arc<EventContext<NoExtensions>>
|results, context| async move {
// logic here
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
pub struct EventContext<TExtensions>
where
TExtensions: Send + Sync,
{
pub database: Arc<PostgresClient>,
pub csv: Arc<AsyncCsvAppender>,
pub extensions: Arc<TExtensions>,
}
Note here that if you have postgres storage off in the YAML configuration file the database
will not be present in
this struct and you will not be able to use it. The same goes for the csv
if you have csv storage off in the YAML.
Event Callback Result
The callback has to return a Result<(), String>
so it can be handled by rindexer, rindexer by default will keep
retrying the event if it fails with exponential backoff.
Success
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// logic here
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
Error
Error takes in a string which then is logged in the rindexer console to help debugging traces.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// logic here
return Err("this is an error".to_string());
},
no_extensions(),
)
.await,
)
.register(registry);
}
Extensions
You can also pass in your own custom thread safe extensions to the context if you wish, this is a way to pass in custom logic. For example say you wanted to use a different database or call something from outside the indexer using an http request then this is the place to pass it in from.
Example below uses the reqwest rust library to make a http request.
use reqwest::blocking::Client;
use std::error::Error;
struct HttpClient {
client: Client,
}
impl HttpClient {
fn new() -> Self {
HttpClient {
client: Client::new(),
}
}
fn get(&self, url: &str) -> Result<String, Box<dyn Error>> {
let response = self.client.get(url).send()?.text()?;
Ok(response)
}
}
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
let response = context.extensions.client.get("https://example.com");
match response {
Ok(response) => {
println!("{}", response),
return Ok(());
}
Err(e) => {
println!("Error: {:?}", e)
return Err(e.to_string());
},
}
},
HttpClient::new(),
)
.await,
)
.register(registry);
}
Network providers
You get exposed to all the network thread safe json rpc providers you have defined in the network YAML configuration file, this allows you to do on chain lookups at indexing time.
This is exposed in the typings
folder. The naming for the provider function is the network name defined in your
YAML configuration file in snake case with get_
prefixed to it and _provider
appended to it.
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: rust
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
storage:
postgres:
enabled: true
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
for example with network ethereum
the provider function would be get_ethereum_provider
.
use crate::rindexer_lib::typings::networks::get_ethereum_provider;
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
let provider = get_ethereum_provider();
let chain_id = provider.get_chainid().await;
match chain_id {
Ok(result) => {
println!("Chain id: {:?}", result)
return Ok(());
}
Err(e) => {
println!("Error getting chain id: {:?}", e)
return Err(e.to_string());
}
}
},
no_extensions(),
)
.await,
)
.register(registry);
}
External Contract calls
You can also make contract calls within the handler, this is useful if you want to get the current state of a contract. You get exposed to the contract for the event you are indexing on but you can also use the global YAML to define other contracts you want to use.
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: rust
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
storage:
postgres:
enabled: true
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
- Approval
global:
contracts:
- name: USDT
details:
- address: 0xdac17f958d2ee523a2206206994597c13d831ec7
network: ethereum
abi: ./abis/erc20.abi.json
Global contract calls
It as easy as importing the contract and calling the function you want. The naming convention for the contract is the contract name defined in your
YAML configuration file in snake case with _contract
appended to it.
...
global:
contracts:
- name: USDT
details:
- address: 0xdac17f958d2ee523a2206206994597c13d831ec7
network: ethereum
abi: ./abis/erc20.abi.json
use crate::rindexer_lib::typings::global_contracts::usdt_contract;
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
let usdt = usdt_contract();
let name = usdt.name().await;
match name {
Ok(result) => {
println!("USDT name: {:?}", name)
return Ok(());
}
Err(e) => {
println!("Error getting USDT name: {:?}", e)
return Err(e.to_string());
}
}
},
no_extensions(),
)
.await,
)
.register(registry);
}
Multiple addresses
If you have defined multiple addresses for a contract in the YAML configuration file you have to pass in the address into the contract function.
use crate::rindexer_lib::typings::global_contracts::usdt_contract;
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
let address: Address = "0xdac17f958d2ee523a2206206994597c13d831ec7"
.parse()
.expect("Invalid address");
let usdt = usdt_contract(address);
let name = usdt.name().await;
match name {
Ok(result) => {
println!("USDT name: {:?}", name)
return Ok(());
}
Err(e) => {
println!("Error getting USDT name: {:?}", e)
return Err(e.to_string());
}
}
},
no_extensions(),
)
.await,
)
.register(registry);
}
Contract calls
Each event to index is defined in a contract within the YAML configuration file, you can also make calls to this contract
within the handler. The naming convention for the contract is the contract name defined in your YAML configuration file
in snake case with _contract
appended to it.
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
- Approval
use crate::rindexer_lib::typings::rust::events::rocket_pool_eth::rocket_pool_eth_contract;
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// have to pass in network name here
let rocket_pool_eth = rocket_pool_eth_contract("ethereum");
let name = rocket_pool_eth.name().await;
match name {
Ok(result) => {
println!("rETH name: {:?}", name)
return Ok(());
}
Err(e) => {
println!("Error getting rETH name: {:?}", e)
return Err(e.to_string());
}
}
},
no_extensions(),
)
.await,
)
.register(registry);
}
Multiple addresses
If you have defined multiple addresses or you have a filter for a contract in the YAML configuration file you will have to pass in the address into the contract function.
use crate::rindexer_lib::typings::rust::events::rocket_pool_eth::rocket_pool_eth_contract;
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
let address: Address = "0xdac17f958d2ee523a2206206994597c13d831ec7"
.parse()
.expect("Invalid address");
// have to pass in network name here
let rocket_pool_eth = rocket_pool_eth_contract("ethereum", address);
let name = rocket_pool_eth.name().await;
match name {
Ok(result) => {
println!("rETH name: {:?}", name)
return Ok(());
}
Err(e) => {
println!("Error getting rETH name: {:?}", e)
return Err(e.to_string());
}
}
},
no_extensions(),
)
.await,
)
.register(registry);
}
Postgres
By default if you set the postgres storage on in the YAML configuration file it will generate you a postgres connected client. This uses the tokio-postgres library. This is a great starting point for you to build on. It uses connection pools by default.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// database client here
context.database
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
Disable Postgres Create Tables
The tables creation can be skipped by using the disable_create_tables in the YAML configuration file. This will generate you a blank handler with no logic inside.
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: rust
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
storage:
postgres:
enabled: true
disable_create_tables: true
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
You can query data from the database and write data to the database, here are the postgres methods exposed.
context.database.new
- This is for creating a new client.context.database.batch_execute
- This is for executing multiple queries at once.context.database.execute
- This is for executing a single query.context.database.prepare
- This is for preparing a query to be executed multiple times.context.database.transaction
- This is for starting a transaction.context.database.query
- This is for querying data from the database.context.database.query_one
- This is for querying a single row from the database.context.database.query_one_or_none
- This is for querying a single row from the database or returning None if no rows are found.context.database.batch_insert
- This is for inserting multiple rows into the database.context.database.copy_in
- This is for inserting multiple rows into the database using the COPY command.
EthereumSqlTypeWrapper
Ethereum types are not 1 to 1 with postgres types, so rindexer has a wrapper to help you with this. This is a enum called EthereumSqlTypeWrapper which has all the types you need to pass into the postgres write functions.
#[derive(Debug, Clone)]
pub enum EthereumSqlTypeWrapper {
// Boolean
Bool(bool),
VecBool(Vec<bool>),
// 8-bit integers
U8(u8),
I8(i8),
VecU8(Vec<u8>),
VecI8(Vec<i8>),
// 16-bit integers
U16(u16),
I16(i16),
VecU16(Vec<u16>),
VecI16(Vec<i16>),
// 32-bit integers
U32(u32),
I32(i32),
VecU32(Vec<u32>),
VecI32(Vec<i32>),
// 64-bit integers
U64(U64),
I64(i64),
VecU64(Vec<U64>),
VecI64(Vec<i64>),
// 128-bit integers
U128(u128),
I128(i128),
VecU128(Vec<u128>),
VecI128(Vec<i128>),
// 256-bit integers
U256(U256),
U256Nullable(U256),
U256Bytes(U256),
U256BytesNullable(U256),
I256(I256),
I256Nullable(I256),
I256Bytes(I256),
I256BytesNullable(I256),
VecU256(Vec<U256>),
VecU256Bytes(Vec<U256>),
VecI256(Vec<I256>),
VecI256Bytes(Vec<I256>),
// 512-bit integers
U512(U512),
VecU512(Vec<U512>),
// Hashes
H128(H128),
H160(H160),
H256(H256),
H256Bytes(H256),
H512(H512),
VecH128(Vec<H128>),
VecH160(Vec<H160>),
VecH256(Vec<H256>),
VecH256Bytes(Vec<H256>),
VecH512(Vec<H512>),
// Address
Address(Address),
AddressBytes(Address),
VecAddress(Vec<Address>),
VecAddressBytes(Vec<Address>),
// Strings and Bytes
String(String),
StringNullable(String),
VecString(Vec<String>),
Bytes(Bytes),
BytesNullable(Bytes),
VecBytes(Vec<Bytes>),
DateTime(DateTime<Utc>),
}
// to use it you just pass the value in the enum
// example
EthereumSqlTypeWrapper::Address(result.tx_information.address)
CSV
csv storage is disabled by default in the YAML configuration file, if you turn it on it will generate you a csv client
methods:
context.csv.append_header
- This is for appending a header to the csv file.context.csv.append
- This is for appending a row to the csv file.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
// csv client here
context.csv
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
Disable CSV Create Headers
If you turn on csv storage then by default rindexer will create headers for you automatically inline with the ABI event data. The CSV header creation can be skipped by using the disable_create_headers in the YAML configuration file.
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: rust
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
storage:
csv:
enabled: true
disable_create_headers: true
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: 18900000
end_block: 19000000
abi: ./abis/RocketTokenRETH.abi.json
include_events:
- Transfer
register
rindexer needs to know which handlers are required to be indexed so you need to register them with the EventCallbackRegistry
.
This passing of &mut EventCallbackRegistry
is taken care of you by the rindexer framework, you just need to call the register
function.
async fn transfer_handler(registry: &mut EventCallbackRegistry) {
RocketPoolETHEventType::Transfer(
TransferEvent::handler(
|results, context| async move {
...
return Ok(());
},
no_extensions(),
)
.await,
)
.register(registry);
}
The main.rs
calls the register_all_handlers
function which lives in all_handlers.rs
this registers all the handlers, this code is
all generated for you and you do not need to worry about it.
use super::rust::rocket_pool_eth::rocket_pool_eth_handlers;
use rindexer::event::callback_registry::EventCallbackRegistry;
pub async fn register_all_handlers() -> EventCallbackRegistry {
let mut registry = EventCallbackRegistry::new();
rocket_pool_eth_handlers(&mut registry).await;
registry
}
main.rs
The rust project will generate you a main.rs which can be ran out the box. This is just boilerplate code to get you started, you can customise this as you wish and should be if your building a custom indexer.
use std::env;
use self::rindexer_lib::indexers::all_handlers::register_all_handlers;
use rindexer::{
start_rindexer, GraphQLServerDetails, GraphQLServerSettings, IndexingDetails, StartDetails,
};
mod rindexer_lib;
#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
let mut enable_graphql = false;
let mut enable_indexer = false;
let mut port: Option<u16> = None;
for arg in args.iter() {
match arg.as_str() {
"--graphql" => enable_graphql = true,
"--indexer" => enable_indexer = true,
_ if arg.starts_with("--port=") || arg.starts_with("--p") => {
if let Some(value) = arg.split('=').nth(1) {
let overridden_port = value.parse::<u16>();
match overridden_port {
Ok(overridden_port) => port = Some(overridden_port),
Err(_) => {
println!("Invalid port number");
return;
}
}
}
}
_ => {}
}
}
let path = env::current_dir();
match path {
Ok(path) => {
let manifest_path = path.join("rindexer.yaml");
let result = start_rindexer(StartDetails {
manifest_path: &manifest_path,
override_environment_path: None,
indexing_details: if enable_indexer {
Some(IndexingDetails {
registry: register_all_handlers(&manifest_path).await,
})
} else {
None
},
graphql_details: GraphqlOverrideSettings {
enabled: enable_graphql,
override_port: port,
},
})
.await;
match result {
Ok(_) => {}
Err(e) => {
println!("Error starting rindexer: {:?}", e);
}
}
}
Err(e) => {
println!("Error getting current directory: {:?}", e);
}
}
}
Running
If you want to run this with docker support for the postgres first run:
docker compose up -d
Then to run the boilerplate code generated for you, you can run the following command:
cargo run
Managing changes when generating typings
When you start changing your YAML configuration file and regenerating your typings the indexer functions may break or need editing to match the new typings. You can regenerate the indexers folder but this will overwrite any changes you did. Luckily the rust compiler is very good at telling you what you need to change.