diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 4d8e96c3..00cebb8b 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,5 +1,8 @@
---
-exclude: ^(Cargo\.nix|crate-hashes\.json|nix/.*)$
+# The files tests/templates/kuttl/smoke/31_configmap_.* define the expected ConfigMap contents.
+# New lines at the end of these files are intentional and must not be removed by the pre-commit hook
+# "end-of-file-fixer".
+exclude: ^(Cargo\.nix|crate-hashes\.json|nix/.*|tests/templates/kuttl/smoke/31_configmap_.*)$
default_language_version:
node: system
diff --git a/tests/templates/kuttl/smoke/30-assert.yaml.j2 b/tests/templates/kuttl/smoke/30-assert.yaml.j2
index 1e2c4792..3d71a8ea 100644
--- a/tests/templates/kuttl/smoke/30-assert.yaml.j2
+++ b/tests/templates/kuttl/smoke/30-assert.yaml.j2
@@ -405,6 +405,45 @@ spec:
name: listener
- mountPath: /stackable/data
name: data
+{% if lookup('env', 'VECTOR_AGGREGATOR') %}
+ - args:
+ - |
+ mkdir --parents /stackable/log/_vector-state
+ # Vector will ignore SIGTERM (as PID != 1) and must be shut down by writing a shutdown trigger file
+ vector --config /stackable/config/vector.yaml & vector_pid=$!
+ if [ ! -f "/stackable/log/_vector/shutdown" ]; then
+ mkdir -p /stackable/log/_vector && inotifywait -qq --event create /stackable/log/_vector; fi
+ sleep 1
+ kill $vector_pid
+ command:
+ - /bin/bash
+ - -x
+ - -euo
+ - pipefail
+ - -c
+ env:
+ - name: VECTOR_LOG
+ value: info
+ - name: VECTOR_AGGREGATOR_ADDRESS
+ valueFrom:
+ configMapKeyRef:
+ key: ADDRESS
+ name: vector-aggregator-discovery
+ imagePullPolicy: IfNotPresent
+ name: vector
+ resources:
+ limits:
+ cpu: 500m
+ memory: 128Mi
+ requests:
+ cpu: 250m
+ memory: 128Mi
+ volumeMounts:
+ - mountPath: /stackable/config
+ name: hdfs-config
+ - mountPath: /stackable/log
+ name: log
+{% endif %}
- args:
- |
mkdir -p /stackable/config/zkfc
@@ -457,9 +496,6 @@ spec:
name: zkfc-config
- mountPath: /stackable/mount/log/zkfc
name: zkfc-log-config
-{% if lookup('env', 'VECTOR_AGGREGATOR') %}
- - name: vector
-{% endif %}
enableServiceLinks: false
initContainers:
- args:
@@ -710,7 +746,9 @@ spec:
name: hdfs-namenode-default
name: format-zookeeper-log-config
volumeClaimTemplates:
- - metadata:
+ - apiVersion: v1
+ kind: PersistentVolumeClaim
+ metadata:
name: data
spec:
accessModes:
@@ -719,7 +757,10 @@ spec:
requests:
storage: 2Gi
volumeMode: Filesystem
- - metadata:
+ - apiVersion: v1
+ kind: PersistentVolumeClaim
+ metadata:
+ name: data
annotations:
listeners.stackable.tech/listener-class: {{ test_scenario['values']['listener-class'] }}
labels:
diff --git a/tests/templates/kuttl/smoke/31-assert.yaml.j2 b/tests/templates/kuttl/smoke/31-assert.yaml.j2
index 3e5090d3..3ac31d65 100644
--- a/tests/templates/kuttl/smoke/31-assert.yaml.j2
+++ b/tests/templates/kuttl/smoke/31-assert.yaml.j2
@@ -6,28 +6,20 @@
# envsubst expands only the listed shell variables ($NAMESPACE etc.),
# leaving Hadoop ${env.FOO} references untouched.
#
-# Both sides are converted to JSON via yq, then jq normalises trailing
-# newlines in string values (collapsing \n+ to \n) so YAML block scalar
-# style (|, |+) differences don't cause false failures.
-# vector.toml is excluded because its presence depends on the
-# VECTOR_AGGREGATOR env var set by the CI/test runner. The snapshot
-# files are plain YAML (not Jinja2 templates), so they cannot
-# conditionally include it.
-#
# kuttl runs script commands with sh, not bash, so process substitution
# is unavailable. Temp files namespaced under $NAMESPACE are used for
# diff output to avoid collisions when tests run concurrently.
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
-timeout: 600
+timeout: 60
commands:
#
# ConfigMap data snapshot: hdfs-namenode-default
#
- script: |
- envsubst '$NAMESPACE' < 31_configmap_hdfs-namenode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json
- kubectl -n $NAMESPACE get cm hdfs-namenode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json
- if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then
+ envsubst '$NAMESPACE' < 31_configmap_hdfs-namenode-default.yaml > /tmp/$NAMESPACE-expected.yaml
+ kubectl -n $NAMESPACE get cm hdfs-namenode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml
+ if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then
echo "ERROR: ConfigMap hdfs-namenode-default data drifted from snapshot."
exit 1
fi
@@ -40,9 +32,9 @@ commands:
{% else %}
export DATANODE_DATA_DIR='[DISK]/stackable/data/data/datanode'
{% endif %}
- envsubst '$NAMESPACE $DATANODE_DATA_DIR' < 31_configmap_hdfs-datanode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json
- kubectl -n $NAMESPACE get cm hdfs-datanode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json
- if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then
+ envsubst '$NAMESPACE $DATANODE_DATA_DIR' < 31_configmap_hdfs-datanode-default.yaml > /tmp/$NAMESPACE-expected.yaml
+ kubectl -n $NAMESPACE get cm hdfs-datanode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml
+ if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then
echo "ERROR: ConfigMap hdfs-datanode-default data drifted from snapshot."
exit 1
fi
@@ -50,9 +42,9 @@ commands:
# ConfigMap data snapshot: hdfs-journalnode-default
#
- script: |
- envsubst '$NAMESPACE' < 31_configmap_hdfs-journalnode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json
- kubectl -n $NAMESPACE get cm hdfs-journalnode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json
- if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then
+ envsubst '$NAMESPACE' < 31_configmap_hdfs-journalnode-default.yaml > /tmp/$NAMESPACE-expected.yaml
+ kubectl -n $NAMESPACE get cm hdfs-journalnode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml
+ if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then
echo "ERROR: ConfigMap hdfs-journalnode-default data drifted from snapshot."
exit 1
fi
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml
deleted file mode 100644
index 909a6494..00000000
--- a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml
+++ /dev/null
@@ -1,185 +0,0 @@
----
-core-site.xml: |-
-
-
-
- fs.defaultFS
- hdfs://hdfs/
-
-
- ha.zookeeper.quorum
- ${env.ZOOKEEPER}
-
-
- hadoop.prometheus.endpoint.enabled
- true
-
-
- io.file.buffer.size
- 131072
-
-
-hadoop-policy.xml: |-
-
-
-
-hdfs-site.xml: |-
-
-
-
- dfs.client.failover.proxy.provider.hdfs
- org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
-
-
- dfs.datanode.data.dir
- $DATANODE_DATA_DIR
-
-
- dfs.datanode.handler.count
- 50
-
-
- dfs.datanode.max.transfer.threads
- 8192
-
-
- dfs.datanode.registered.hostname
- ${env.POD_ADDRESS}
-
-
- dfs.datanode.registered.http.port
- ${env.HTTP_PORT}
-
-
- dfs.datanode.registered.ipc.port
- ${env.IPC_PORT}
-
-
- dfs.datanode.registered.port
- ${env.DATA_PORT}
-
-
- dfs.datanode.sync.behind.writes
- true
-
-
- dfs.datanode.synconclose
- true
-
-
- dfs.ha.automatic-failover.enabled
- true
-
-
- dfs.ha.fencing.methods
- shell(/bin/true)
-
-
- dfs.ha.namenode.id
- ${env.POD_NAME}
-
-
- dfs.ha.namenodes.hdfs
- hdfs-namenode-default-0,hdfs-namenode-default-1
-
-
- dfs.journalnode.edits.dir
- /stackable/data/journalnode
-
-
- dfs.namenode.datanode.registration.unsafe.allow-address-override
- true
-
-
- dfs.namenode.handler.count
- 50
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.name.dir
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
- /stackable/data/namenode
-
-
- dfs.namenode.replication.max-streams
- 4
-
-
- dfs.namenode.replication.max-streams-hard-limit
- 8
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.shared.edits.dir
- qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
-
-
- dfs.nameservices
- hdfs
-
-
- dfs.replication
- 1
-
-
-hdfs.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
-
-security.properties: |
- networkaddress.cache.negative.ttl=0
- networkaddress.cache.ttl=30
-ssl-client.xml: |-
-
-
-
-ssl-server.xml: |-
-
-
-
-wait-for-namenodes.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/wait-for-namenodes/wait-for-namenodes.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2
new file mode 100644
index 00000000..aecd285e
--- /dev/null
+++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2
@@ -0,0 +1,785 @@
+core-site.xml: |-
+
+
+
+ fs.defaultFS
+ hdfs://hdfs/
+
+
+ ha.zookeeper.quorum
+ ${env.ZOOKEEPER}
+
+
+ hadoop.prometheus.endpoint.enabled
+ true
+
+
+ io.file.buffer.size
+ 131072
+
+
+hadoop-policy.xml: |-
+
+
+
+hdfs-site.xml: |-
+
+
+
+ dfs.client.failover.proxy.provider.hdfs
+ org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+
+
+ dfs.datanode.data.dir
+ $DATANODE_DATA_DIR
+
+
+ dfs.datanode.handler.count
+ 50
+
+
+ dfs.datanode.max.transfer.threads
+ 8192
+
+
+ dfs.datanode.registered.hostname
+ ${env.POD_ADDRESS}
+
+
+ dfs.datanode.registered.http.port
+ ${env.HTTP_PORT}
+
+
+ dfs.datanode.registered.ipc.port
+ ${env.IPC_PORT}
+
+
+ dfs.datanode.registered.port
+ ${env.DATA_PORT}
+
+
+ dfs.datanode.sync.behind.writes
+ true
+
+
+ dfs.datanode.synconclose
+ true
+
+
+ dfs.ha.automatic-failover.enabled
+ true
+
+
+ dfs.ha.fencing.methods
+ shell(/bin/true)
+
+
+ dfs.ha.namenode.id
+ ${env.POD_NAME}
+
+
+ dfs.ha.namenodes.hdfs
+ hdfs-namenode-default-0,hdfs-namenode-default-1
+
+
+ dfs.journalnode.edits.dir
+ /stackable/data/journalnode
+
+
+ dfs.namenode.datanode.registration.unsafe.allow-address-override
+ true
+
+
+ dfs.namenode.handler.count
+ 50
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.name.dir
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
+ /stackable/data/namenode
+
+
+ dfs.namenode.replication.max-streams
+ 4
+
+
+ dfs.namenode.replication.max-streams-hard-limit
+ 8
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.shared.edits.dir
+ qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
+
+
+ dfs.nameservices
+ hdfs
+
+
+ dfs.replication
+ 1
+
+
+hdfs.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
+security.properties: |
+ networkaddress.cache.negative.ttl=0
+ networkaddress.cache.ttl=30
+ssl-client.xml: |-
+
+
+
+ssl-server.xml: |-
+
+
+
+{% if lookup('env', 'VECTOR_AGGREGATOR') %}
+vector.yaml: |
+ data_dir: /stackable/log/_vector-state
+
+ log_schema:
+ host_key: pod
+
+ sources:
+ vector:
+ type: internal_logs
+
+ files_stdout:
+ type: file
+ include:
+ - /stackable/log/*/*.stdout.log
+
+ files_stderr:
+ type: file
+ include:
+ - /stackable/log/*/*.stderr.log
+
+ files_log4j:
+ type: file
+ include:
+ - /stackable/log/*/*.log4j.xml
+ line_delimiter: "\r\n"
+ multiline:
+ mode: halt_before
+ start_pattern: ^" + raw_message + ""
+ parsed_event, err = parse_xml(wrapped_xml_event)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ root = object!(parsed_event.root)
+ if !is_object(root.event) {
+ error = "Parsed event contains no \"event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if keys(root) != ["event"] {
+ .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", "))
+ }
+ event = object!(root.event)
+
+ epoch_milliseconds, err = to_int(event.@timestamp)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "Time not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.@logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ message, err = string(event.message)
+ if err != null || is_empty(message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ throwable = string(event.throwable) ?? ""
+ .message = join!(compact([message, throwable]), "\n")
+ }
+ }
+
+ processed_files_log4j2:
+ inputs:
+ - files_log4j2
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ event = {}
+ parsed_event, err = parse_xml(raw_message)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if !is_object(parsed_event.Event) {
+ error = "Parsed event contains no \"Event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event.Event)
+
+ tag_instant_valid = false
+ instant, err = object(event.Instant)
+ if err == null {
+ epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond)
+ if err == null && epoch_nanoseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ tag_instant_valid = true
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ }
+ if !tag_instant_valid {
+ epoch_milliseconds, err = to_int(event.@timeMillis)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ }
+
+ .logger, err = string(event.@loggerName)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ exception = null
+ thrown = event.Thrown
+ if is_object(thrown) {
+ exception = "Exception"
+ thread, err = string(event.@thread)
+ if err == null && !is_empty(thread) {
+ exception = exception + " in thread \"" + thread + "\""
+ }
+ thrown_name, err = string(thrown.@name)
+ if err == null && !is_empty(exception) {
+ exception = exception + " " + thrown_name
+ }
+ message = string(thrown.@localizedMessage) ??
+ string(thrown.@message) ??
+ ""
+ if !is_empty(message) {
+ exception = exception + ": " + message
+ }
+ stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? []
+ stacktrace = ""
+ for_each(stacktrace_items) -> |_index, value| {
+ stacktrace = stacktrace + " "
+ class = string(value.@class) ?? ""
+ method = string(value.@method) ?? ""
+ if !is_empty(class) && !is_empty(method) {
+ stacktrace = stacktrace + "at " + class + "." + method
+ }
+ file = string(value.@file) ?? ""
+ line = string(value.@line) ?? ""
+ if !is_empty(file) && !is_empty(line) {
+ stacktrace = stacktrace + "(" + file + ":" + line + ")"
+ }
+ exact = to_bool(value.@exact) ?? false
+ location = string(value.@location) ?? ""
+ version = string(value.@version) ?? ""
+ if !is_empty(location) && !is_empty(version) {
+ stacktrace = stacktrace + " "
+ if !exact {
+ stacktrace = stacktrace + "~"
+ }
+ stacktrace = stacktrace + "[" + location + ":" + version + "]"
+ }
+ stacktrace = stacktrace + "\n"
+ }
+ if stacktrace != "" {
+ exception = exception + "\n" + stacktrace
+ }
+ }
+
+ message, err = string(event.Message)
+ if err != null || is_empty(message) {
+ message = null
+ .errors = push(.errors, "Message not found.")
+ }
+ .message = join!(compact([message, exception]), "\n")
+ }
+ }
+
+ processed_files_py:
+ inputs:
+ - files_py
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ asctime, err = string(event.asctime)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.name)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.levelname)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if level == "DEBUG" {
+ .level = "DEBUG"
+ } else if level == "INFO" {
+ .level = "INFO"
+ } else if level == "WARNING" {
+ .level = "WARN"
+ } else if level == "ERROR" {
+ .level = "ERROR"
+ } else if level == "CRITICAL" {
+ .level = "FATAL"
+ } else {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ }
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ }
+
+ processed_files_airlift:
+ inputs:
+ - files_airlift
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ timestamp_string, err = string(event.timestamp)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ .thread = string(parsed_event.thread) ?? null
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ stacktrace = string(event.stackTrace) ?? ""
+ .message = join!(compact([.message, stacktrace]), "\n\n")
+ }
+
+ extended_logs_files:
+ inputs:
+ - processed_files_*
+ type: remap
+ source: |
+ del(.source_type)
+ if .errors == [] {
+ del(.errors)
+ }
+ . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$')
+
+ filtered_logs_vector:
+ inputs:
+ - vector
+ type: filter
+ condition: '!includes(["TRACE", "DEBUG"], .metadata.level)'
+
+ extended_logs_vector:
+ inputs:
+ - filtered_logs_vector
+ type: remap
+ source: |
+ .container = "vector"
+ .level = .metadata.level
+ .logger = .metadata.module_path
+ if exists(.file) { .processed_file = del(.file) }
+ del(.metadata)
+ del(.pid)
+ del(.source_type)
+
+ extended_logs:
+ inputs:
+ - extended_logs_*
+ type: remap
+ source: |
+ .namespace = "$NAMESPACE"
+ .cluster = "hdfs"
+ .role = "datanode"
+ .roleGroup = "default"
+
+ sinks:
+ aggregator:
+ inputs:
+ - extended_logs
+ type: vector
+ address: $VECTOR_AGGREGATOR_ADDRESS
+{% endif -%}
+wait-for-namenodes.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/wait-for-namenodes/wait-for-namenodes.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml
deleted file mode 100644
index 8600ee57..00000000
--- a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml
+++ /dev/null
@@ -1,167 +0,0 @@
----
-core-site.xml: |-
-
-
-
- fs.defaultFS
- hdfs://hdfs/
-
-
- ha.zookeeper.quorum
- ${env.ZOOKEEPER}
-
-
- hadoop.prometheus.endpoint.enabled
- true
-
-
- io.file.buffer.size
- 131072
-
-
-hadoop-policy.xml: |-
-
-
-
-hdfs-site.xml: |-
-
-
-
- dfs.client.failover.proxy.provider.hdfs
- org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
-
-
- dfs.datanode.handler.count
- 50
-
-
- dfs.datanode.max.transfer.threads
- 8192
-
-
- dfs.datanode.registered.hostname
- ${env.POD_ADDRESS}
-
-
- dfs.datanode.registered.http.port
- ${env.HTTP_PORT}
-
-
- dfs.datanode.registered.ipc.port
- ${env.IPC_PORT}
-
-
- dfs.datanode.registered.port
- ${env.DATA_PORT}
-
-
- dfs.datanode.sync.behind.writes
- true
-
-
- dfs.datanode.synconclose
- true
-
-
- dfs.ha.automatic-failover.enabled
- true
-
-
- dfs.ha.fencing.methods
- shell(/bin/true)
-
-
- dfs.ha.namenode.id
- ${env.POD_NAME}
-
-
- dfs.ha.namenodes.hdfs
- hdfs-namenode-default-0,hdfs-namenode-default-1
-
-
- dfs.journalnode.edits.dir
- /stackable/data/journalnode
-
-
- dfs.namenode.datanode.registration.unsafe.allow-address-override
- true
-
-
- dfs.namenode.handler.count
- 50
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.name.dir
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
- /stackable/data/namenode
-
-
- dfs.namenode.replication.max-streams
- 4
-
-
- dfs.namenode.replication.max-streams-hard-limit
- 8
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.shared.edits.dir
- qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
-
-
- dfs.nameservices
- hdfs
-
-
- dfs.replication
- 1
-
-
-hdfs.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
-
-security.properties: |
- networkaddress.cache.negative.ttl=0
- networkaddress.cache.ttl=30
-ssl-client.xml: |-
-
-
-
-ssl-server.xml: |-
-
-
-
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2
new file mode 100644
index 00000000..fdc777f8
--- /dev/null
+++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2
@@ -0,0 +1,767 @@
+core-site.xml: |-
+
+
+
+ fs.defaultFS
+ hdfs://hdfs/
+
+
+ ha.zookeeper.quorum
+ ${env.ZOOKEEPER}
+
+
+ hadoop.prometheus.endpoint.enabled
+ true
+
+
+ io.file.buffer.size
+ 131072
+
+
+hadoop-policy.xml: |-
+
+
+
+hdfs-site.xml: |-
+
+
+
+ dfs.client.failover.proxy.provider.hdfs
+ org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+
+
+ dfs.datanode.handler.count
+ 50
+
+
+ dfs.datanode.max.transfer.threads
+ 8192
+
+
+ dfs.datanode.registered.hostname
+ ${env.POD_ADDRESS}
+
+
+ dfs.datanode.registered.http.port
+ ${env.HTTP_PORT}
+
+
+ dfs.datanode.registered.ipc.port
+ ${env.IPC_PORT}
+
+
+ dfs.datanode.registered.port
+ ${env.DATA_PORT}
+
+
+ dfs.datanode.sync.behind.writes
+ true
+
+
+ dfs.datanode.synconclose
+ true
+
+
+ dfs.ha.automatic-failover.enabled
+ true
+
+
+ dfs.ha.fencing.methods
+ shell(/bin/true)
+
+
+ dfs.ha.namenode.id
+ ${env.POD_NAME}
+
+
+ dfs.ha.namenodes.hdfs
+ hdfs-namenode-default-0,hdfs-namenode-default-1
+
+
+ dfs.journalnode.edits.dir
+ /stackable/data/journalnode
+
+
+ dfs.namenode.datanode.registration.unsafe.allow-address-override
+ true
+
+
+ dfs.namenode.handler.count
+ 50
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.name.dir
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
+ /stackable/data/namenode
+
+
+ dfs.namenode.replication.max-streams
+ 4
+
+
+ dfs.namenode.replication.max-streams-hard-limit
+ 8
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.shared.edits.dir
+ qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
+
+
+ dfs.nameservices
+ hdfs
+
+
+ dfs.replication
+ 1
+
+
+hdfs.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
+security.properties: |
+ networkaddress.cache.negative.ttl=0
+ networkaddress.cache.ttl=30
+ssl-client.xml: |-
+
+
+
+ssl-server.xml: |-
+
+
+
+{%- if lookup('env', 'VECTOR_AGGREGATOR') %}
+
+vector.yaml: |
+ data_dir: /stackable/log/_vector-state
+
+ log_schema:
+ host_key: pod
+
+ sources:
+ vector:
+ type: internal_logs
+
+ files_stdout:
+ type: file
+ include:
+ - /stackable/log/*/*.stdout.log
+
+ files_stderr:
+ type: file
+ include:
+ - /stackable/log/*/*.stderr.log
+
+ files_log4j:
+ type: file
+ include:
+ - /stackable/log/*/*.log4j.xml
+ line_delimiter: "\r\n"
+ multiline:
+ mode: halt_before
+ start_pattern: ^" + raw_message + ""
+ parsed_event, err = parse_xml(wrapped_xml_event)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ root = object!(parsed_event.root)
+ if !is_object(root.event) {
+ error = "Parsed event contains no \"event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if keys(root) != ["event"] {
+ .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", "))
+ }
+ event = object!(root.event)
+
+ epoch_milliseconds, err = to_int(event.@timestamp)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "Time not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.@logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ message, err = string(event.message)
+ if err != null || is_empty(message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ throwable = string(event.throwable) ?? ""
+ .message = join!(compact([message, throwable]), "\n")
+ }
+ }
+
+ processed_files_log4j2:
+ inputs:
+ - files_log4j2
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ event = {}
+ parsed_event, err = parse_xml(raw_message)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if !is_object(parsed_event.Event) {
+ error = "Parsed event contains no \"Event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event.Event)
+
+ tag_instant_valid = false
+ instant, err = object(event.Instant)
+ if err == null {
+ epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond)
+ if err == null && epoch_nanoseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ tag_instant_valid = true
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ }
+ if !tag_instant_valid {
+ epoch_milliseconds, err = to_int(event.@timeMillis)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ }
+
+ .logger, err = string(event.@loggerName)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ exception = null
+ thrown = event.Thrown
+ if is_object(thrown) {
+ exception = "Exception"
+ thread, err = string(event.@thread)
+ if err == null && !is_empty(thread) {
+ exception = exception + " in thread \"" + thread + "\""
+ }
+ thrown_name, err = string(thrown.@name)
+ if err == null && !is_empty(exception) {
+ exception = exception + " " + thrown_name
+ }
+ message = string(thrown.@localizedMessage) ??
+ string(thrown.@message) ??
+ ""
+ if !is_empty(message) {
+ exception = exception + ": " + message
+ }
+ stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? []
+ stacktrace = ""
+ for_each(stacktrace_items) -> |_index, value| {
+ stacktrace = stacktrace + " "
+ class = string(value.@class) ?? ""
+ method = string(value.@method) ?? ""
+ if !is_empty(class) && !is_empty(method) {
+ stacktrace = stacktrace + "at " + class + "." + method
+ }
+ file = string(value.@file) ?? ""
+ line = string(value.@line) ?? ""
+ if !is_empty(file) && !is_empty(line) {
+ stacktrace = stacktrace + "(" + file + ":" + line + ")"
+ }
+ exact = to_bool(value.@exact) ?? false
+ location = string(value.@location) ?? ""
+ version = string(value.@version) ?? ""
+ if !is_empty(location) && !is_empty(version) {
+ stacktrace = stacktrace + " "
+ if !exact {
+ stacktrace = stacktrace + "~"
+ }
+ stacktrace = stacktrace + "[" + location + ":" + version + "]"
+ }
+ stacktrace = stacktrace + "\n"
+ }
+ if stacktrace != "" {
+ exception = exception + "\n" + stacktrace
+ }
+ }
+
+ message, err = string(event.Message)
+ if err != null || is_empty(message) {
+ message = null
+ .errors = push(.errors, "Message not found.")
+ }
+ .message = join!(compact([message, exception]), "\n")
+ }
+ }
+
+ processed_files_py:
+ inputs:
+ - files_py
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ asctime, err = string(event.asctime)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.name)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.levelname)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if level == "DEBUG" {
+ .level = "DEBUG"
+ } else if level == "INFO" {
+ .level = "INFO"
+ } else if level == "WARNING" {
+ .level = "WARN"
+ } else if level == "ERROR" {
+ .level = "ERROR"
+ } else if level == "CRITICAL" {
+ .level = "FATAL"
+ } else {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ }
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ }
+
+ processed_files_airlift:
+ inputs:
+ - files_airlift
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ timestamp_string, err = string(event.timestamp)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ .thread = string(parsed_event.thread) ?? null
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ stacktrace = string(event.stackTrace) ?? ""
+ .message = join!(compact([.message, stacktrace]), "\n\n")
+ }
+
+ extended_logs_files:
+ inputs:
+ - processed_files_*
+ type: remap
+ source: |
+ del(.source_type)
+ if .errors == [] {
+ del(.errors)
+ }
+ . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$')
+
+ filtered_logs_vector:
+ inputs:
+ - vector
+ type: filter
+ condition: '!includes(["TRACE", "DEBUG"], .metadata.level)'
+
+ extended_logs_vector:
+ inputs:
+ - filtered_logs_vector
+ type: remap
+ source: |
+ .container = "vector"
+ .level = .metadata.level
+ .logger = .metadata.module_path
+ if exists(.file) { .processed_file = del(.file) }
+ del(.metadata)
+ del(.pid)
+ del(.source_type)
+
+ extended_logs:
+ inputs:
+ - extended_logs_*
+ type: remap
+ source: |
+ .namespace = "$NAMESPACE"
+ .cluster = "hdfs"
+ .role = "journalnode"
+ .roleGroup = "default"
+
+ sinks:
+ aggregator:
+ inputs:
+ - extended_logs
+ type: vector
+ address: $VECTOR_AGGREGATOR_ADDRESS
+{%- endif -%}
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml
deleted file mode 100644
index ffb249ae..00000000
--- a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml
+++ /dev/null
@@ -1,211 +0,0 @@
----
-core-site.xml: |-
-
-
-
- fs.defaultFS
- hdfs://hdfs/
-
-
- ha.zookeeper.quorum
- ${env.ZOOKEEPER}
-
-
- hadoop.prometheus.endpoint.enabled
- true
-
-
- io.file.buffer.size
- 131072
-
-
-format-namenodes.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/format-namenodes/format-namenodes.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
-
-format-zookeeper.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/format-zookeeper/format-zookeeper.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
-
-hadoop-policy.xml: |-
-
-
-
-hdfs-site.xml: |-
-
-
-
- dfs.client.failover.proxy.provider.hdfs
- org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
-
-
- dfs.datanode.handler.count
- 50
-
-
- dfs.datanode.max.transfer.threads
- 8192
-
-
- dfs.datanode.registered.hostname
- ${env.POD_ADDRESS}
-
-
- dfs.datanode.registered.http.port
- ${env.HTTP_PORT}
-
-
- dfs.datanode.registered.ipc.port
- ${env.IPC_PORT}
-
-
- dfs.datanode.registered.port
- ${env.DATA_PORT}
-
-
- dfs.datanode.sync.behind.writes
- true
-
-
- dfs.datanode.synconclose
- true
-
-
- dfs.ha.automatic-failover.enabled
- true
-
-
- dfs.ha.fencing.methods
- shell(/bin/true)
-
-
- dfs.ha.namenode.id
- ${env.POD_NAME}
-
-
- dfs.ha.namenodes.hdfs
- hdfs-namenode-default-0,hdfs-namenode-default-1
-
-
- dfs.journalnode.edits.dir
- /stackable/data/journalnode
-
-
- dfs.namenode.datanode.registration.unsafe.allow-address-override
- true
-
-
- dfs.namenode.handler.count
- 50
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
-
-
- dfs.namenode.name.dir
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
- /stackable/data/namenode
-
-
- dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
- /stackable/data/namenode
-
-
- dfs.namenode.replication.max-streams
- 4
-
-
- dfs.namenode.replication.max-streams-hard-limit
- 8
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
- hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
- hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
-
-
- dfs.namenode.shared.edits.dir
- qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
-
-
- dfs.nameservices
- hdfs
-
-
- dfs.replication
- 1
-
-
-hdfs.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
-
-security.properties: |
- networkaddress.cache.negative.ttl=0
- networkaddress.cache.ttl=30
-ssl-client.xml: |-
-
-
-
-ssl-server.xml: |-
-
-
-
-zkfc.log4j.properties: |+
- log4j.rootLogger=INFO, CONSOLE, FILE
-
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
- log4j.appender.FILE=org.apache.log4j.RollingFileAppender
- log4j.appender.FILE.Threshold=INFO
- log4j.appender.FILE.File=/stackable/log/zkfc/zkfc.log4j.xml
- log4j.appender.FILE.MaxFileSize=5MB
- log4j.appender.FILE.MaxBackupIndex=1
- log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2
new file mode 100644
index 00000000..03071e51
--- /dev/null
+++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2
@@ -0,0 +1,811 @@
+core-site.xml: |-
+
+
+
+ fs.defaultFS
+ hdfs://hdfs/
+
+
+ ha.zookeeper.quorum
+ ${env.ZOOKEEPER}
+
+
+ hadoop.prometheus.endpoint.enabled
+ true
+
+
+ io.file.buffer.size
+ 131072
+
+
+format-namenodes.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/format-namenodes/format-namenodes.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
+format-zookeeper.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/format-zookeeper/format-zookeeper.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
+hadoop-policy.xml: |-
+
+
+
+hdfs-site.xml: |-
+
+
+
+ dfs.client.failover.proxy.provider.hdfs
+ org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+
+
+ dfs.datanode.handler.count
+ 50
+
+
+ dfs.datanode.max.transfer.threads
+ 8192
+
+
+ dfs.datanode.registered.hostname
+ ${env.POD_ADDRESS}
+
+
+ dfs.datanode.registered.http.port
+ ${env.HTTP_PORT}
+
+
+ dfs.datanode.registered.ipc.port
+ ${env.IPC_PORT}
+
+
+ dfs.datanode.registered.port
+ ${env.DATA_PORT}
+
+
+ dfs.datanode.sync.behind.writes
+ true
+
+
+ dfs.datanode.synconclose
+ true
+
+
+ dfs.ha.automatic-failover.enabled
+ true
+
+
+ dfs.ha.fencing.methods
+ shell(/bin/true)
+
+
+ dfs.ha.namenode.id
+ ${env.POD_NAME}
+
+
+ dfs.ha.namenodes.hdfs
+ hdfs-namenode-default-0,hdfs-namenode-default-1
+
+
+ dfs.journalnode.edits.dir
+ /stackable/data/journalnode
+
+
+ dfs.namenode.datanode.registration.unsafe.allow-address-override
+ true
+
+
+ dfs.namenode.handler.count
+ 50
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.http-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870
+
+
+ dfs.namenode.name.dir
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0
+ /stackable/data/namenode
+
+
+ dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1
+ /stackable/data/namenode
+
+
+ dfs.namenode.replication.max-streams
+ 4
+
+
+ dfs.namenode.replication.max-streams-hard-limit
+ 8
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0
+ hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1
+ hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020
+
+
+ dfs.namenode.shared.edits.dir
+ qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs
+
+
+ dfs.nameservices
+ hdfs
+
+
+ dfs.replication
+ 1
+
+
+hdfs.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+
+security.properties: |
+ networkaddress.cache.negative.ttl=0
+ networkaddress.cache.ttl=30
+ssl-client.xml: |-
+
+
+
+ssl-server.xml: |-
+
+
+
+{% if lookup('env', 'VECTOR_AGGREGATOR') %}
+vector.yaml: |
+ data_dir: /stackable/log/_vector-state
+
+ log_schema:
+ host_key: pod
+
+ sources:
+ vector:
+ type: internal_logs
+
+ files_stdout:
+ type: file
+ include:
+ - /stackable/log/*/*.stdout.log
+
+ files_stderr:
+ type: file
+ include:
+ - /stackable/log/*/*.stderr.log
+
+ files_log4j:
+ type: file
+ include:
+ - /stackable/log/*/*.log4j.xml
+ line_delimiter: "\r\n"
+ multiline:
+ mode: halt_before
+ start_pattern: ^" + raw_message + ""
+ parsed_event, err = parse_xml(wrapped_xml_event)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ root = object!(parsed_event.root)
+ if !is_object(root.event) {
+ error = "Parsed event contains no \"event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if keys(root) != ["event"] {
+ .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", "))
+ }
+ event = object!(root.event)
+
+ epoch_milliseconds, err = to_int(event.@timestamp)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "Time not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.@logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ message, err = string(event.message)
+ if err != null || is_empty(message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ throwable = string(event.throwable) ?? ""
+ .message = join!(compact([message, throwable]), "\n")
+ }
+ }
+
+ processed_files_log4j2:
+ inputs:
+ - files_log4j2
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ event = {}
+ parsed_event, err = parse_xml(raw_message)
+ if err != null {
+ error = "XML not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ if !is_object(parsed_event.Event) {
+ error = "Parsed event contains no \"Event\" tag."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event.Event)
+
+ tag_instant_valid = false
+ instant, err = object(event.Instant)
+ if err == null {
+ epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond)
+ if err == null && epoch_nanoseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ tag_instant_valid = true
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err)
+ }
+ }
+ if !tag_instant_valid {
+ epoch_milliseconds, err = to_int(event.@timeMillis)
+ if err == null && epoch_milliseconds != 0 {
+ converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds")
+ if err == null {
+ .timestamp = converted_timestamp
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err)
+ }
+ }
+
+ .logger, err = string(event.@loggerName)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.@level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ exception = null
+ thrown = event.Thrown
+ if is_object(thrown) {
+ exception = "Exception"
+ thread, err = string(event.@thread)
+ if err == null && !is_empty(thread) {
+ exception = exception + " in thread \"" + thread + "\""
+ }
+ thrown_name, err = string(thrown.@name)
+ if err == null && !is_empty(exception) {
+ exception = exception + " " + thrown_name
+ }
+ message = string(thrown.@localizedMessage) ??
+ string(thrown.@message) ??
+ ""
+ if !is_empty(message) {
+ exception = exception + ": " + message
+ }
+ stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? []
+ stacktrace = ""
+ for_each(stacktrace_items) -> |_index, value| {
+ stacktrace = stacktrace + " "
+ class = string(value.@class) ?? ""
+ method = string(value.@method) ?? ""
+ if !is_empty(class) && !is_empty(method) {
+ stacktrace = stacktrace + "at " + class + "." + method
+ }
+ file = string(value.@file) ?? ""
+ line = string(value.@line) ?? ""
+ if !is_empty(file) && !is_empty(line) {
+ stacktrace = stacktrace + "(" + file + ":" + line + ")"
+ }
+ exact = to_bool(value.@exact) ?? false
+ location = string(value.@location) ?? ""
+ version = string(value.@version) ?? ""
+ if !is_empty(location) && !is_empty(version) {
+ stacktrace = stacktrace + " "
+ if !exact {
+ stacktrace = stacktrace + "~"
+ }
+ stacktrace = stacktrace + "[" + location + ":" + version + "]"
+ }
+ stacktrace = stacktrace + "\n"
+ }
+ if stacktrace != "" {
+ exception = exception + "\n" + stacktrace
+ }
+ }
+
+ message, err = string(event.Message)
+ if err != null || is_empty(message) {
+ message = null
+ .errors = push(.errors, "Message not found.")
+ }
+ .message = join!(compact([message, exception]), "\n")
+ }
+ }
+
+ processed_files_py:
+ inputs:
+ - files_py
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ asctime, err = string(event.asctime)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.name)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.levelname)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if level == "DEBUG" {
+ .level = "DEBUG"
+ } else if level == "INFO" {
+ .level = "INFO"
+ } else if level == "WARNING" {
+ .level = "WARN"
+ } else if level == "ERROR" {
+ .level = "ERROR"
+ } else if level == "CRITICAL" {
+ .level = "FATAL"
+ } else {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ }
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ }
+
+ processed_files_airlift:
+ inputs:
+ - files_airlift
+ type: remap
+ source: |
+ raw_message = string!(.message)
+
+ .timestamp = now()
+ .logger = ""
+ .level = "INFO"
+ .message = ""
+ .errors = []
+
+ parsed_event, err = parse_json(raw_message)
+ if err != null {
+ error = "JSON not parsable: " + err
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else if !is_object(parsed_event) {
+ error = "Parsed event is not a JSON object."
+ .errors = push(.errors, error)
+ log(error, level: "warn")
+ .message = raw_message
+ } else {
+ event = object!(parsed_event)
+
+ timestamp_string, err = string(event.timestamp)
+ if err == null {
+ parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ")
+ if err == null {
+ .timestamp = parsed_timestamp
+ } else {
+ .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err)
+ }
+ } else {
+ .errors = push(.errors, "Timestamp not found, using current time instead.")
+ }
+
+ .logger, err = string(event.logger)
+ if err != null || is_empty(.logger) {
+ .errors = push(.errors, "Logger not found.")
+ }
+
+ level, err = string(event.level)
+ if err != null {
+ .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
+ } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) {
+ .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
+ } else {
+ .level = level
+ }
+
+ .thread = string(parsed_event.thread) ?? null
+
+ .message, err = string(event.message)
+ if err != null || is_empty(.message) {
+ .errors = push(.errors, "Message not found.")
+ }
+ stacktrace = string(event.stackTrace) ?? ""
+ .message = join!(compact([.message, stacktrace]), "\n\n")
+ }
+
+ extended_logs_files:
+ inputs:
+ - processed_files_*
+ type: remap
+ source: |
+ del(.source_type)
+ if .errors == [] {
+ del(.errors)
+ }
+ . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$')
+
+ filtered_logs_vector:
+ inputs:
+ - vector
+ type: filter
+ condition: '!includes(["TRACE", "DEBUG"], .metadata.level)'
+
+ extended_logs_vector:
+ inputs:
+ - filtered_logs_vector
+ type: remap
+ source: |
+ .container = "vector"
+ .level = .metadata.level
+ .logger = .metadata.module_path
+ if exists(.file) { .processed_file = del(.file) }
+ del(.metadata)
+ del(.pid)
+ del(.source_type)
+
+ extended_logs:
+ inputs:
+ - extended_logs_*
+ type: remap
+ source: |
+ .namespace = "$NAMESPACE"
+ .cluster = "hdfs"
+ .role = "namenode"
+ .roleGroup = "default"
+
+ sinks:
+ aggregator:
+ inputs:
+ - extended_logs
+ type: vector
+ address: $VECTOR_AGGREGATOR_ADDRESS
+{% endif -%}
+zkfc.log4j.properties: |+
+ log4j.rootLogger=INFO, CONSOLE, FILE
+
+ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+ log4j.appender.CONSOLE.Threshold=INFO
+ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+ log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+ log4j.appender.FILE=org.apache.log4j.RollingFileAppender
+ log4j.appender.FILE.Threshold=INFO
+ log4j.appender.FILE.File=/stackable/log/zkfc/zkfc.log4j.xml
+ log4j.appender.FILE.MaxFileSize=5MB
+ log4j.appender.FILE.MaxBackupIndex=1
+ log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout
+