Kafka I/O¶
StateFun functions consume and emit through Kafka via declarative
module.yamlspecs. Exactly-once when paired with transactional Kafka and Flink checkpointing.
Ingress¶
kind: io.statefun.kafka.v1/ingress
spec:
id: example/orders
address: kafka.svc:9092
consumerGroupId: example-orders
startupPosition:
type: latest # or earliest, group-offsets, specific-offsets
topics:
- topic: example.orders
valueType: example/Order # registered SDK type
targets:
- example/order-handler
Each entry under topics: maps inbound records to a target function namespace + name. The valueType declares how StateFun decodes the record value — typically a Protobuf type registered in your SDK code.
Startup position¶
startupPosition.type |
Meaning |
|---|---|
latest |
Consume only records produced after the consumer starts |
earliest |
Consume from the beginning of each partition |
group-offsets |
Resume from the consumer group's last committed offset (default for restarts) |
specific-offsets |
Pin specific partition offsets (advanced) |
at-timestamp |
Resume from records produced at or after a wall-clock time |
Startup-from-timestamp example¶
Egress¶
kind: io.statefun.kafka.v1/egress
spec:
id: example/notifications
address: kafka.svc:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 60000
Functions emit to a Kafka egress via the SDK:
KafkaEgressMessage outbound = KafkaEgressMessage.forEgress(
TypeName.typeNameFromString("example/notifications"))
.withTopic("example.notifications")
.withKey(orderId)
.withValue(notificationPayload)
.build();
ctx.send(outbound);
The runtime uses Flink transactions to deliver exactly once when paired with a transactional Kafka client and exactly-once Flink checkpointing.
At-least-once vs exactly-once¶
deliverySemantic.type |
Trade-off |
|---|---|
exactly-once |
Strongest guarantee. Slightly higher producer latency due to transactions. Default for production. |
at-least-once |
Higher throughput. Duplicates possible on JM failover (the runtime replays the last checkpoint window). |
Transaction timeout
For exactly-once, set transactionTimeoutMillis higher than your Flink checkpoint interval, but lower than the Kafka broker's transaction.max.timeout.ms (default 15 min). 60 s is a good starting point for sub-minute checkpoint intervals.
Patterns¶
Routing to multiple namespaces¶
TargetFunctions.fromPatternString accepts <namespace>/<name> or <namespace>/*. Comma-lists and wildcard namespaces (*/foo) are not supported. To target multiple namespaces, declare one entry per namespace:
topics:
- topic: orders.events
valueType: example/OrderEvent
targets:
- orders/order-handler
- audit/order-audit
- billing/order-billing
Multi-topic ingresses¶
A single kafka.v1/ingress can declare multiple topics: entries; each has its own valueType and targets:
topics:
- topic: orders.events
valueType: example/OrderEvent
targets: [orders/handler]
- topic: shipments.events
valueType: example/ShipmentEvent
targets: [shipments/handler]
Custom Protobuf types¶
The default io.statefun.types/string works for simple cases. For typed binary payloads, register SimpleType.simpleImmutableTypeFrom(...) in your SDK and reference it as valueType:
public static final Type<Order> ORDER_TYPE =
SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameFromString("example/Order"),
Order::toByteArray,
Order::parseFrom);
Works more reliably than the generic string codec for binary content.
Next steps¶
- Kinesis I/O — same routing model, AWS Kinesis transport.
- Kubernetes deployment — wiring ingress/egress in production.
- Architecture overview — how the dispatcher routes ingress messages.