Logging¶
How it works in StateFun on K8s¶
Both JobManager and TaskManager read their Logback config from ${FLINK_HOME}/conf/logback-console.xml. Under the Flink Kubernetes Operator that file lives in an auto-generated read-only ConfigMap that the Operator mounts over /opt/flink/conf/ — the mount replaces the whole directory, so anything baked into the image at /opt/flink/conf/* is masked at runtime.
The Operator builds that ConfigMap from two sources:
config.yaml— derived fromspec.flinkConfiguration.spec.logConfiguration— a map of<filename> → <content>you provide on theFlinkDeployment. The key for Logback is exactlylogback-console.xml.
If spec.logConfiguration.logback-console.xml is not set, the Operator omits the key entirely. Logback then has no config file to read, falls back to BasicConfigurator, and emits plain-text WARN+ to stderr. Structured JSON is opt-in via this block.
JSON config used by StateFun E2E¶
The reference XML lives at statefun-e2e-tests/statefun-e2e-k8s-native/src/test/resources/k8s/flink-deployment.yaml. Copy-paste:
spec:
logConfiguration:
logback-console.xml: |
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds">
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampPattern>
<throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
<maxDepthPerThrowable>120</maxDepthPerThrowable>
<shortenedClassNameLength>60</shortenedClassNameLength>
<exclude>^sun\.reflect\..*\.invoke</exclude>
<exclude>^jdk\.internal\.reflect\..*</exclude>
<exclude>^java\.lang\.reflect\.Method\.invoke</exclude>
<rootCauseFirst>true</rootCauseFirst>
</throwableConverter>
</encoder>
</appender>
<root level="${ROOT_LOG_LEVEL:-INFO}">
<appender-ref ref="console"/>
</root>
<logger name="org.apache.pekko" level="INFO"/>
<logger name="org.apache.kafka" level="INFO"/>
<logger name="org.apache.hadoop" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"/>
<logger name="com.amazonaws.services.s3.internal.Mimetypes" level="ERROR"/>
</configuration>
What each piece does:
LogstashEncoder— single-line JSON per event; Logstash-standard field names (@timestamp,level,logger_name,thread_name,message,stack_trace, MDC fields flattened).ShortenedThrowableConverter+rootCauseFirst=true— Flink wraps exceptions deeply (task → operator → function → user code); root-cause-first puts the user-actionable frame on top. The<exclude>patterns drop reflection plumbing frames.scan="true" scanPeriod="30 seconds"— Logback's auto-reload. Edits to the operator-mounted ConfigMap are picked up within 30 seconds without pod restart.${ROOT_LOG_LEVEL:-INFO}— root level falls back toINFObut can be overridden per deployment via env var or-DJVM opt (see below).- Flink stock logger pins — verbatim from upstream
flink/conf/logback-console.xml. Pekko/Kafka/Hadoop/Zookeeper atINFO, Netty's noisy pipeline logger atERROR. com.amazonaws.services.s3.internal.MimetypesERROR — silences a benign AWS SDK v1 startup warning emitted byflink-s3-fs-presto.
Sample emitted record (real JM output):
{"@timestamp":"2026-05-17T17:14:32.597Z","@version":"1","message":"Preconfiguration: ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
Override the root level¶
| Mechanism | Effect | Scope |
|---|---|---|
-e ROOT_LOG_LEVEL=DEBUG (container env) |
Single value, all loggers | Per pod, restart needed |
-DROOT_LOG_LEVEL=DEBUG (JVM opt via env.java.opts.all) |
Same as above | Per pod, restart needed |
<logger> entries added to spec.logConfiguration.logback-console.xml |
Per-logger overrides | Per FlinkDeployment, restart needed |
Edit the operator-mounted ConfigMap directly (kubectl edit cm flink-config-<deployment>) |
Per-logger overrides | Picked up within 30s via scan="true", no restart |
The last option is the safest in-incident path — no pod cycle.
In-incident DEBUG candidates¶
| Symptom | Logger to bump |
|---|---|
| Checkpoints failing / slow | org.apache.flink.runtime.checkpoint.CheckpointCoordinator |
| HA failover / split brain | org.apache.flink.runtime.leaderelection |
| TM crash / restart | org.apache.flink.runtime.taskmanager |
| Kafka consumer-group rebalance | org.apache.kafka.clients.consumer.internals.ConsumerCoordinator |
| Kafka source lag | org.apache.flink.connector.kafka.source |
| StateFun routing / dispatch | org.apache.flink.statefun.flink.core |
| K8s client errors | io.fabric8.kubernetes.client |
| RocksDB state-backend | org.apache.flink.contrib.streaming.state |
How to verify what the pod actually sees¶
kubectl exec <jm-pod> -- cat /opt/flink/conf/logback-console.xml
kubectl exec <tm-pod> -- cat /opt/flink/conf/logback-console.xml
If the output is empty or doesn't contain LogstashEncoder, the Operator's ConfigMap was generated without the logback-console.xml key — re-check spec.logConfiguration on the FlinkDeployment.
Aggregator compatibility¶
LogstashEncoder defaults drop cleanly into Elasticsearch / Logstash (native), Loki / Grafana (JSON parser stage), Datadog (level/message map directly), and CloudWatch / OpenObserve (JSON ingestion native). Existing logback_events_total{level="error"} counters propagate unchanged.
Async logging — when to opt in¶
The reference config writes synchronously, matching Flink upstream. Sync is fine up to ~5k events/sec/pod; checkpoint-completion bursts on typical workloads peak at 50–100 events/sec/pod. For genuinely high throughput, wrap the console appender in ch.qos.logback.classic.AsyncAppender and add an explicit <shutdownHook> to drain on SIGTERM (Logback 1.4+ no longer auto-registers one).