Kafka Integration
Overview
Kafka is a scalable messaging platform. This integration allows you to consume webhook events using a Kafka topic in addition to or instead of consuming the JSON events directly from a configured webhook, so events (for example, a “new user” event when a new user is created in FusionAuth) can be sent from FusionAuth to a Kafka topic.
When the Kafka integration is enabled, all webhook events across all tenants will be sent to Kafka with all fields included.
Configuration
The Kafka integration may be enabled using the Integrations API or through the FusionAuth UI by navigating to Settings -> Integrations -> Kafka.
By default, you’ll see properties set for the Kafka Producer configuration, including bootstrap.servers
, max.block.ms
, and request.timeout.ms
. These tell FusionAuth how to make the initial connection to your Kafka cluster and set how long it can wait to send information to Kafka. You can find a complete list of allowed configuration for this block at the Kafka configuration documentation.
Specify a topic that you’ve already created in your Kafka cluster and press “Send test event” to make sure that the connection is working as expected. After seeing that it succeeded, don’t forget to press “Save” in the top right to turn on the Kafka integration.
You should see an event similar to the following in your Kafka topic if the test succeeds.
{"createInstant":1667831017070,"id":"4532ba80-9443-4300-a324-3a2193e56c67","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}
Example Configuration for Docker Compose
If you’re running Kafka alongside FusionAuth (for example, from the same docker-compose.yaml
file), the only thing you need to change in the default configuration is to show FusionAuth where to find the Kafka bootstrap server on the network.
If your Docker Compose file is as follows:
Example docker-compose.yml with Kafka
version: '3'
services:
db:
image: postgres:16.0-bookworm
environment:
PGDATA: /var/lib/postgresql/data/pgdata
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
interval: 5s
timeout: 5s
retries: 5
networks:
- db_net
restart: unless-stopped
volumes:
- db_data:/var/lib/postgresql/data
search:
image: opensearchproject/opensearch:2.11.0
environment:
cluster.name: fusionauth
discovery.type: single-node
node.name: search
plugins.security.disabled: true
bootstrap.memory_lock: true
OPENSEARCH_JAVA_OPTS: ${OPENSEARCH_JAVA_OPTS}
healthcheck:
interval: 10s
retries: 80
test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:9200/
restart: unless-stopped
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
ports:
- 9200:9200 # REST API
- 9600:9600 # Performance Analyzer
volumes:
- search_data:/usr/share/opensearch/data
networks:
- search_net
fusionauth:
image: fusionauth/fusionauth-app:latest
depends_on:
- db
- search
- kafka
- zookeeper
environment:
DATABASE_URL: jdbc:postgresql://db:5432/fusionauth
DATABASE_ROOT_USERNAME: ${POSTGRES_USER}
DATABASE_ROOT_PASSWORD: ${POSTGRES_PASSWORD}
DATABASE_USERNAME: ${DATABASE_USERNAME}
DATABASE_PASSWORD: ${DATABASE_PASSWORD}
FUSIONAUTH_APP_MEMORY: ${FUSIONAUTH_APP_MEMORY}
FUSIONAUTH_APP_RUNTIME_MODE: ${FUSIONAUTH_APP_RUNTIME_MODE}
FUSIONAUTH_APP_URL: http://fusionauth:9011
SEARCH_SERVERS: http://search:9200
SEARCH_TYPE: elasticsearch
FUSIONAUTH_APP_KICKSTART_FILE: ${FUSIONAUTH_APP_KICKSTART_FILE}
networks:
- db_net
- search_net
restart: unless-stopped
ports:
- 9011:9011
volumes:
- fusionauth_config:/usr/local/fusionauth/config
- ./kickstart:/usr/local/fusionauth/kickstart
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
networks:
- db_net
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
db_net:
aliases:
- kafka
networks:
db_net:
driver: bridge
search_net:
driver: bridge
volumes:
db_data:
fusionauth_config:
search_data:
Then you would input the following configuration in the FusionAuth UI to configure the Kafka integration.
bootstrap.servers=kafka:9092
max.block.ms=5000
request.timeout.ms=2000
Example Configuration for a Remote Managed Kafka Integration
If you’re using a managed service for Kafka that runs on a different server than your FusionAuth installation, you’ll also need to specify credentials for connecting to the remote Kafka instance. You should be able to get the exact configuration you need from your Kafka hosting provider by looking for “Producer configuration” or similar. It should look similar to the following.
bootstrap.servers=pkc-6ojv2.us-west4.gcp.your-kafka-provider.cloud:9092
client.dns.lookup=use_all_dns_ips
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ZN6LJ5UHSXZLW3LR' password='M9T8b85OPspFAS37Do5Baq7jIS+hl7h7bY8MRrfVff5lz8xeCwea7zB5AC3nKXUD';
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
session.timeout.ms=45000
Event Types and Configuration
After successfully connecting to a Kafka instance, you’ll be notified of each event that happens in FusionAuth. Events will generally contain fields for:
id
: A unique Id for that event that can be used for deduplication.createInstant
: A timestamp indicating when the event occurred.type
: The kind of event that occurred.info
: A map of extra information about the event and its source, such as IP address and device information.
Other fields applicable to the event may also be included. You can find the full schema for each event in the webhook events documentation.
Events can be categorized into two broad types:
- System events, which include audit log events and event log data.
- Tenant-based events, which include detailed information about user creation, removal, or changes.
By default, system events will be sent to Kafka without further configuration. Tenant events, however, are dependent on a Webhook being set up and active. The events sent to Kafka then follow the configuration of events for that webhook and tenant. If you don’t already have a webhook configured and want to use Kafka for tenant-level events, we recommend you set up a no-op webhook receiver that accepts the incoming POST request, discards it, and returns a 200 OK
status. This will allow you to set up a dummy webhook configuration to control Kafka tenant-level events. To create such a receiver, you can use a low-code platform such as Pipedream or Zapier, or roll your own.
Example Events Sent to Kafka
After creating the integration and using FusionAuth, your Kafka topic might look similar to the following, which shows events for:
- Sending the initial test event.
- The audit log event for creating a new user with the email address
newuser@example.com
. This is a system-level event. - The tenant-level event for creating the above user.
- An error event because the SMTP integration isn’t working so the password reset email couldn’t be sent to the new user. This is a system-level event.
{"createInstant":1667833973280,"id":"e6a4f780-02da-4b5a-8b04-94d2a49ea369","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}
{"event":{"auditLog":{"id":38,"insertInstant":1667834917902,"insertUser":"test@example.com","message":"Created user with Id [3cbb85e7-ebf8-4c92-bc75-7ca8db4399db], name [null] and loginId [newuser@example.com]","reason":"FusionAuth User Interface"},"createInstant":1667834917903,"id":"4b81d279-24c7-463b-847a-0cecaaf113a0","info":{"ipAddress":"192.168.16.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"type":"audit-log.create"}}
{"event":{"createInstant":1667834917903,"id":"0c11627f-9461-4a00-8156-00ff6c3d68d3","info":{"ipAddress":"172.22.0.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","type":"user.create","user":{"active":true,"connectorId":"e3306678-a53a-4964-9040-1c96f36dda72","email":"newuser@example.com","id":"7e512f97-79f7-42e5-891f-a2383ed3460c","insertInstant":1667834917902,"lastLoginInstant":1667834917902,"lastUpdateInstant":1667834917902,"passwordChangeRequired":false,"passwordLastUpdateInstant":1667834917902,"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","twoFactor":{},"usernameStatus":"ACTIVE","verified":true}}}
{"event":{"createInstant":1667834917916,"eventLog":{"id":34,"insertInstant":1667834917913,"message":"Async Email Send exception occurred.\n\nTemplate Id: 3e6462be-178c-499f-92c9-3643ccca8ced\nTemplate Name: [FusionAuth Default] Setup Password\nTenant Id: 6825d48e-4df4-f83e-1055-f1d42e363749\nAddressed to: newuser@example.com\n\nCause:\ncom.sun.mail.util.MailConnectException : Message: Couldn't connect to host, port: localhost, 25; timeout -1","type":"Error"},"id":"44ca31e5-967c-4b5c-8ff4-1ee51d73999a","type":"event-log.create"}}
Troubleshooting FusionAuth’s Kafka Integration
Kafka is a powerful but complicated piece of software and you’ll need Kafka expertise to run a production Kafka setup to consume all of your FusionAuth events. If you get stuck integrating Kafka, try the following.
Troubleshooting the FusionAuth-Side of the Integration
- Check that you included a topic when adding the integration.
- Check that you saved the integration after configuring and testing it.
- Check the FusionAuth logs for errors relating to Kafka.
- If you’re using Docker Compose, check that you’ve correctly mapped the ports for both Kafka and Zookeeper and correctly configured a Docker network so that FusionAuth can connect to Kafka.
Troubleshooting your Kafka Installation
It can be useful to run quick commands directly against your Kafka cluster to create topics and log events. You can download a version of Kafka from the official Kafka website and extract it to your local machine. This will give you some utility scripts to directly interact with your Kafka cluster.
If you’re using a Docker Compose setup locally, add an entry to your hosts file to map kafka
to 127.0.0.1
.
127.0.0.1 kafka
You can then create a topic (for example, fa-events
) that you can use in FusionAuth by running the following.
bin/kafka-topics.sh --create --topic fa-events --bootstrap-server kafka:9092
You can set a consumer to watch for new events with the following command. If everything is correctly set up, you’ll see events streamed to this consumer and logged to your shell as you carry out actions in FusionAuth.
bin/kafka-console-consumer.sh --topic fa-events --from-beginning --bootstrap-server kafka:9092
Or if you need to talk to a remote Kafka cluster, you can create a file locally called consumer.properties
with credentials for your remote cluster.
bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
client.dns.lookup=use_all_dns_ips
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ZN6LJ5UHSXWLW2LR' password='M9T8b75OPspFAS37Do5Baq7jIS+hi7h7bY8MRrfVff5lz8xeCweaRTO8GD3nKXUD';
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
session.timeout.ms=45000
And then run the scripts above passing in the file with the --consumer.config
flag and the remote bootstrap server. For example, use the following to see events posted to your fa-events
topic in a remote cluster.
bin/kafka-console-consumer.sh --bootstrap-server pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 --consumer.config consumer.properties --topic fa-events