Skip to content

Kafka

Kafka streams do not work with windows from the CLI installation, it will panic if you try to use it with windows. If you are on windows and want to use kafka streams you should use the docker image.

rindexer allows you to configure Kafka to stream any data to. This goes under the contracts section of the YAML configuration file.

Find out more about Kafka.

rindexer kafka integration supports SSL queues and none SSL queues.

Configuration with rindexer

kafka property accepts an array of topics allowing you to split up the streams any way you wish.

Example

Kafka has to be configured to use SASL_SSL or PLAINTEXT. You can read more about it here.

none-ssl
name: RocketPoolETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: no-code
networks:
- name: ethereum
  chain_id: 1
  rpc: https://mainnet.gateway.tenderly.co
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers: 
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: PLAINTEXT
      topics: 
        - topic: test-topic
          # key is optional
          key: my-routing-key
          networks: 
            - ethereum
          events: 
            - event_name: Transfer

Response

The response sent to you is already decoded and parsed into a JSON object.

  • event_name - The name of the event
  • event_data - The event data which has all the event fields decoded and the transaction information which is under transaction_information
  • network - The network the event was emitted on

For example a transfer event would look like:

{
    "event_name": "Transfer",
    "event_data": {
        "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
        "to": "0x0338ce5020c447f7e668dc2ef778025ce3982662",
        "value": "1000000000000000000",
        "transaction_information": {
            "address": "0xae78736cd615f374d3085123a210448e74fc6393",
            "block_hash": "0x8461da7a1d4b47190a01fa6eae219be40aacffab0dd64af7259b2d404572c3d9",
            "block_number": "18718011",
            "log_index": "0",
            "network": "ethereum",
            "transaction_hash": "0x145c6705ffbf461e85d08b4a7f5850d6b52a7364d93a057722ca1194034f3ba4",
            "transaction_index": "0"
        }
    },
    "network": "ethereum"
}

brokers

You define the kafka brokers you wish to connect to, you can pass in multiple brokers if you wish. A single broker will of course work as well.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers: 
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}

acks

  • acks=0 - When acks=0 producers consider messages as "written successfully" the moment the message was sent without waiting for the broker to accept it at all.
  • acks=1 - When acks=1 , producers consider messages as "written successfully" when the message was acknowledged by only the leader.
  • acks=all - When acks=all, producers consider messages as "written successfully" when the message is accepted by all in-sync replicas (ISR).
rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      # all or 0 or 1
      acks: all
      security_protocol: SASL_SSL

security_protocol

This is either PLAINTEXT or SASL_SSL. You can read more about it here.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL

sasl_mechanisms

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN

sasl_username

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>

sasl_password

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>

topics

This is an array of topics you want to stream to this kafka.

topic

This is the topic name.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic

key

You can route your messages to a specific partition in the topic, this is useful if you have multiple consumers on the same topic.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events:
            - event_name: Transfer

networks

This is an array of networks you want to stream to this kafka.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks: 
            - ethereum
          events:
            - event_name: Transfer

events

This is an array of events you want to stream to this kafka.

event_name

This is the name of the event you want to stream to this kafka, must match the ABI event name.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events: 
            - event_name: Transfer

conditions

This accepts an array of conditions you want to apply to the event data before streaming to this kafka.

You may want to filter on the stream based on the event data, if the event data has not got an index on the on the solidity event you can not filter it over the logs. The conditions filter is here to help you with this, based on your ABI you can filter on the event data.

rindexer has enabled a special syntax which allows you to define on your ABI fields what you want to filter on.

  1. > - higher then (for numbers only)
  2. < - lower then (for numbers only)
  3. = - equals
  4. >= - higher then or equals (for numbers only)
  5. <= - lower then or equals (for numbers only)
  6. || - or
  7. && - and

So lets look at an example lets say i only want to get transfer events which are higher then 2000000000000000000 RETH wei

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "value": ">=2000000000000000000"

We use the ABI input name value to filter on the value field, you can find these names in the ABI file.

{
    "anonymous":false,
    "inputs":[
      {
        "indexed":true,
        "internalType":"address",
        "name":"from",
        "type":"address"
      },
      {
        "indexed":true,
        "internalType":"address",
        "name":"to",
        "type":"address"
      },
      {
        "indexed":false,
        "internalType":"uint256",
        "name":"value", 
        "type":"uint256"
      }
    ],
    "name":"Transfer",
    "type":"event"
}

You can use the || or && to combine conditions.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "value": ">=2000000000000000000 && value <=4000000000000000000"

You can use the = to filter on other aspects like the from or to address.

rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662 || 0x0338ce5020c447f7e668dc2ef778025ce398266u"
                - "value": ">=2000000000000000000 || value <=4000000000000000000"

If you have a tuple and you want to get that value you just use the object notation.

For example lets say we want to only get the events for profileId from the quoteParams tuple which equals 1:

{
     "anonymous": false,
     "inputs": [
       {
         "components": [
           {
             "internalType": "uint256",
             "name": "profileId", 
             "type": "uint256"
           },
           ...
         ],
         "indexed": false,
         "internalType": "struct Types.QuoteParams",
         "name": "quoteParams", 
         "type": "tuple"
       },
       ...
     ],
     "name": "QuoteCreated", 
     "type": "event"
}
rindexer.yaml
...
contracts:
- name: RocketPoolETH
  details:
  - network: ethereum
    address: "0xae78736cd615f374d3085123a210448e74fc6393"
    start_block: "18600000"
    end_block: "18600181"
  abi: "./abis/RocketTokenRETH.abi.json"
  include_events:
  - Transfer
  streams: 
    kafka: 
      brokers:
        - ${KAFKA_BROKER_URL_1}
        - ${KAFKA_BROKER_URL_2}
      acks: all
      security_protocol: SASL_SSL
      sasl_mechanisms: PLAIN
      sasl_username: lt;CLUSTER_API_KEY>
      sasl_password: lt;CLUSTER_API_SECRET>
      topics:
        - topic: test-topic
          key: my-routing-key
          networks:
            - ethereum
          events: 
            - event_name: Transfer
              conditions: 
                - "quoteParams.profileId": "=1"