Kafka
Kafka streams do not work with windows from the CLI installation, it will panic if you try to use it with windows. If you are on windows and want to use kafka streams you should use the docker image.
rindexer allows you to configure Kafka to stream any data to. This goes under the contracts section of the YAML configuration file.
Find out more about Kafka.
rindexer kafka integration supports SSL queues and none SSL queues.
Configuration with rindexer
kafka
property accepts an array of topics
allowing you to split up the streams any way you wish.
Example
Kafka has to be configured to use SASL_SSL or PLAINTEXT. You can read more about it here.
name: RocketPoolETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: no-code
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: PLAINTEXT
topics:
- topic: test-topic
# key is optional
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
Response
The response sent to you is already decoded and parsed into a JSON object.
event_name
- The name of the eventevent_signature_hash
- The event signature hash example the keccak256 hash of "Transfer(address,address,uint256)", this is topics[0] in the logsevent_data
- The event data which has all the event fields decoded and the transaction information which is undertransaction_information
network
- The network the event was emitted on
For example a transfer event would look like:
{
"event_name": "Transfer",
"event_signature_hash": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"event_data": {
"from": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
"to": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
"value": "1000000000000000000",
"transaction_information": {
"address": "0xae78736cd615f374d3085123a210448e74fc6393",
"block_hash": "0x8461da7a1d4b47190a01fa6eae219be40aacffab0dd64af7259b2d404572c3d9",
"block_number": "18718011",
"log_index": "0",
"network": "ethereum",
"transaction_hash": "0x145c6705ffbf461e85d08b4a7f5850d6b52a7364d93a057722ca1194034f3ba4",
"transaction_index": "0"
}
},
"network": "ethereum"
}
brokers
You define the kafka brokers you wish to connect to, you can pass in multiple brokers if you wish. A single broker will of course work as well.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks
acks=0
- When acks=0 producers consider messages as "written successfully" the moment the message was sent without waiting for the broker to accept it at all.acks=1
- When acks=1 , producers consider messages as "written successfully" when the message was acknowledged by only the leader.acks=all
- When acks=all, producers consider messages as "written successfully" when the message is accepted by all in-sync replicas (ISR).
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
# all or 0 or 1
acks: all
security_protocol: SASL_SSL
security_protocol
This is either PLAINTEXT
or SASL_SSL
. You can read more about it here.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics
This is an array of topics you want to stream to this kafka.
topic
This is the topic name.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key
You can route your messages to a specific partition in the topic, this is useful if you have multiple consumers on the same topic.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
networks
This is an array of networks you want to stream to this kafka.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
events
This is an array of events you want to stream to this kafka.
event_name
This is the name of the event you want to stream to this kafka, must match the ABI event name.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
conditions
This accepts an array of conditions you want to apply to the event data before streaming to this kafka.
You may want to filter on the stream based on the event data, if the event data has not got an index on the on the
solidity event you can not filter it over the logs. The conditions
filter is here to help you with this,
based on your ABI you can filter on the event data.
rindexer has enabled a special syntax which allows you to define on your ABI fields what you want to filter on.
>
- higher then (for numbers only)<
- lower then (for numbers only)=
- equals>=
- higher then or equals (for numbers only)<=
- lower then or equals (for numbers only)||
- or&&
- and
So lets look at an example lets say i only want to get transfer events which are higher then 2000000000000000000
RETH wei
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000"
We use the ABI input name value
to filter on the value field, you can find these names in the ABI file.
{
"anonymous":false,
"inputs":[
{
"indexed":true,
"internalType":"address",
"name":"from",
"type":"address"
},
{
"indexed":true,
"internalType":"address",
"name":"to",
"type":"address"
},
{
"indexed":false,
"internalType":"uint256",
"name":"value",
"type":"uint256"
}
],
"name":"Transfer",
"type":"event"
}
You can use the ||
or &&
to combine conditions.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000 && value <=4000000000000000000"
You can use the =
to filter on other aspects like the from
or to
address.
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662 || 0x0338ce5020c447f7e668dc2ef778025ce398266u"
- "value": ">=2000000000000000000 || value <=4000000000000000000"
If you have a tuple and you want to get that value you just use the object notation.
For example lets say we want to only get the events for profileId
from the quoteParams
tuple which equals 1
:
{
"anonymous": false,
"inputs": [
{
"components": [
{
"internalType": "uint256",
"name": "profileId",
"type": "uint256"
},
...
],
"indexed": false,
"internalType": "struct Types.QuoteParams",
"name": "quoteParams",
"type": "tuple"
},
...
],
"name": "QuoteCreated",
"type": "event"
}
...
contracts:
- name: RocketPoolETH
details:
- network: ethereum
address: "0xae78736cd615f374d3085123a210448e74fc6393"
start_block: "18600000"
end_block: "18600181"
abi: "./abis/RocketTokenRETH.abi.json"
include_events:
- Transfer
streams:
kafka:
brokers:
- ${KAFKA_BROKER_URL_1}
- ${KAFKA_BROKER_URL_2}
acks: all
security_protocol: SASL_SSL
sasl_mechanisms: PLAIN
sasl_username: lt;CLUSTER_API_KEY>
sasl_password: lt;CLUSTER_API_SECRET>
topics:
- topic: test-topic
key: my-routing-key
networks:
- ethereum
events:
- event_name: Transfer
conditions:
- "quoteParams.profileId": "=1"