Kafka Integration
Add Kafka helpers to an existing PolePosition project:
polepos add integration kafka
The command creates:
src/<package>/integrations/kafka/
__init__.py
consumer.py
factory.py
producer.py
schemas.py
testing.py
It also updates:
src/<package>/settings.py.env.examplepyproject.toml
Dependency
The command adds:
aiokafka>=0.12.0
Sync dependencies after adding the integration:
uv sync
Settings
Review the Kafka values in .env:
KAFKA_ENABLED=false
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CLIENT_ID=<package>
KAFKA_DEFAULT_TOPIC=<package>.events
KAFKA_GROUP_ID=<package>
KAFKA_AUTO_OFFSET_RESET=earliest
KAFKA_ACKS=all
# KAFKA_COMPRESSION_TYPE=
KAFKA_REQUEST_TIMEOUT_MS=40000
Use the Producer
The generated factory builds a producer from settings. Keep producer lifetime management explicit in your route, service, or application wiring.
Use the generated schemas to keep event payloads predictable.
Consumers and Workers
The generated consumer helper is scaffolding for a worker surface. It is not started automatically by the FastAPI app.
For production, run consumers as explicit worker processes or jobs so API startup remains fast and predictable.
Testing
Use InMemoryKafkaEventProducer from testing.py when unit tests should assert
published events without connecting to Kafka.
Validate
polepos check
The check command validates Kafka files, settings, env values, and dependency signals without connecting to Kafka.