diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4d8e96c3..00cebb8b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,8 @@ --- -exclude: ^(Cargo\.nix|crate-hashes\.json|nix/.*)$ +# The files tests/templates/kuttl/smoke/31_configmap_.* define the expected ConfigMap contents. +# New lines at the end of these files are intentional and must not be removed by the pre-commit hook +# "end-of-file-fixer". +exclude: ^(Cargo\.nix|crate-hashes\.json|nix/.*|tests/templates/kuttl/smoke/31_configmap_.*)$ default_language_version: node: system diff --git a/tests/templates/kuttl/smoke/30-assert.yaml.j2 b/tests/templates/kuttl/smoke/30-assert.yaml.j2 index 1e2c4792..3d71a8ea 100644 --- a/tests/templates/kuttl/smoke/30-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/30-assert.yaml.j2 @@ -405,6 +405,45 @@ spec: name: listener - mountPath: /stackable/data name: data +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - args: + - | + mkdir --parents /stackable/log/_vector-state + # Vector will ignore SIGTERM (as PID != 1) and must be shut down by writing a shutdown trigger file + vector --config /stackable/config/vector.yaml & vector_pid=$! + if [ ! -f "/stackable/log/_vector/shutdown" ]; then + mkdir -p /stackable/log/_vector && inotifywait -qq --event create /stackable/log/_vector; fi + sleep 1 + kill $vector_pid + command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + env: + - name: VECTOR_LOG + value: info + - name: VECTOR_AGGREGATOR_ADDRESS + valueFrom: + configMapKeyRef: + key: ADDRESS + name: vector-aggregator-discovery + imagePullPolicy: IfNotPresent + name: vector + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 250m + memory: 128Mi + volumeMounts: + - mountPath: /stackable/config + name: hdfs-config + - mountPath: /stackable/log + name: log +{% endif %} - args: - | mkdir -p /stackable/config/zkfc @@ -457,9 +496,6 @@ spec: name: zkfc-config - mountPath: /stackable/mount/log/zkfc name: zkfc-log-config -{% if lookup('env', 'VECTOR_AGGREGATOR') %} - - name: vector -{% endif %} enableServiceLinks: false initContainers: - args: @@ -710,7 +746,9 @@ spec: name: hdfs-namenode-default name: format-zookeeper-log-config volumeClaimTemplates: - - metadata: + - apiVersion: v1 + kind: PersistentVolumeClaim + metadata: name: data spec: accessModes: @@ -719,7 +757,10 @@ spec: requests: storage: 2Gi volumeMode: Filesystem - - metadata: + - apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: data annotations: listeners.stackable.tech/listener-class: {{ test_scenario['values']['listener-class'] }} labels: diff --git a/tests/templates/kuttl/smoke/31-assert.yaml.j2 b/tests/templates/kuttl/smoke/31-assert.yaml.j2 index 3e5090d3..3ac31d65 100644 --- a/tests/templates/kuttl/smoke/31-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/31-assert.yaml.j2 @@ -6,28 +6,20 @@ # envsubst expands only the listed shell variables ($NAMESPACE etc.), # leaving Hadoop ${env.FOO} references untouched. # -# Both sides are converted to JSON via yq, then jq normalises trailing -# newlines in string values (collapsing \n+ to \n) so YAML block scalar -# style (|, |+) differences don't cause false failures. -# vector.toml is excluded because its presence depends on the -# VECTOR_AGGREGATOR env var set by the CI/test runner. The snapshot -# files are plain YAML (not Jinja2 templates), so they cannot -# conditionally include it. -# # kuttl runs script commands with sh, not bash, so process substitution # is unavailable. Temp files namespaced under $NAMESPACE are used for # diff output to avoid collisions when tests run concurrently. apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 600 +timeout: 60 commands: # # ConfigMap data snapshot: hdfs-namenode-default # - script: | - envsubst '$NAMESPACE' < 31_configmap_hdfs-namenode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json - kubectl -n $NAMESPACE get cm hdfs-namenode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json - if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then + envsubst '$NAMESPACE' < 31_configmap_hdfs-namenode-default.yaml > /tmp/$NAMESPACE-expected.yaml + kubectl -n $NAMESPACE get cm hdfs-namenode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml + if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then echo "ERROR: ConfigMap hdfs-namenode-default data drifted from snapshot." exit 1 fi @@ -40,9 +32,9 @@ commands: {% else %} export DATANODE_DATA_DIR='[DISK]/stackable/data/data/datanode' {% endif %} - envsubst '$NAMESPACE $DATANODE_DATA_DIR' < 31_configmap_hdfs-datanode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json - kubectl -n $NAMESPACE get cm hdfs-datanode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json - if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then + envsubst '$NAMESPACE $DATANODE_DATA_DIR' < 31_configmap_hdfs-datanode-default.yaml > /tmp/$NAMESPACE-expected.yaml + kubectl -n $NAMESPACE get cm hdfs-datanode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml + if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then echo "ERROR: ConfigMap hdfs-datanode-default data drifted from snapshot." exit 1 fi @@ -50,9 +42,9 @@ commands: # ConfigMap data snapshot: hdfs-journalnode-default # - script: | - envsubst '$NAMESPACE' < 31_configmap_hdfs-journalnode-default.yaml | yq -o=json | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-expected.json - kubectl -n $NAMESPACE get cm hdfs-journalnode-default -o yaml | yq -o=json '.data | del(.["vector.toml"])' | jq 'with_entries(.value |= sub("\n+$"; "\n"))' > /tmp/$NAMESPACE-actual.json - if ! diff /tmp/$NAMESPACE-expected.json /tmp/$NAMESPACE-actual.json; then + envsubst '$NAMESPACE' < 31_configmap_hdfs-journalnode-default.yaml > /tmp/$NAMESPACE-expected.yaml + kubectl -n $NAMESPACE get cm hdfs-journalnode-default -o yaml | yq '.data' > /tmp/$NAMESPACE-actual.yaml + if ! diff /tmp/$NAMESPACE-expected.yaml /tmp/$NAMESPACE-actual.yaml; then echo "ERROR: ConfigMap hdfs-journalnode-default data drifted from snapshot." exit 1 fi diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml deleted file mode 100644 index 909a6494..00000000 --- a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml +++ /dev/null @@ -1,185 +0,0 @@ ---- -core-site.xml: |- - - - - fs.defaultFS - hdfs://hdfs/ - - - ha.zookeeper.quorum - ${env.ZOOKEEPER} - - - hadoop.prometheus.endpoint.enabled - true - - - io.file.buffer.size - 131072 - - -hadoop-policy.xml: |- - - - -hdfs-site.xml: |- - - - - dfs.client.failover.proxy.provider.hdfs - org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - - - dfs.datanode.data.dir - $DATANODE_DATA_DIR - - - dfs.datanode.handler.count - 50 - - - dfs.datanode.max.transfer.threads - 8192 - - - dfs.datanode.registered.hostname - ${env.POD_ADDRESS} - - - dfs.datanode.registered.http.port - ${env.HTTP_PORT} - - - dfs.datanode.registered.ipc.port - ${env.IPC_PORT} - - - dfs.datanode.registered.port - ${env.DATA_PORT} - - - dfs.datanode.sync.behind.writes - true - - - dfs.datanode.synconclose - true - - - dfs.ha.automatic-failover.enabled - true - - - dfs.ha.fencing.methods - shell(/bin/true) - - - dfs.ha.namenode.id - ${env.POD_NAME} - - - dfs.ha.namenodes.hdfs - hdfs-namenode-default-0,hdfs-namenode-default-1 - - - dfs.journalnode.edits.dir - /stackable/data/journalnode - - - dfs.namenode.datanode.registration.unsafe.allow-address-override - true - - - dfs.namenode.handler.count - 50 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.name.dir - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 - /stackable/data/namenode - - - dfs.namenode.replication.max-streams - 4 - - - dfs.namenode.replication.max-streams-hard-limit - 8 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.shared.edits.dir - qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs - - - dfs.nameservices - hdfs - - - dfs.replication - 1 - - -hdfs.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout - -security.properties: | - networkaddress.cache.negative.ttl=0 - networkaddress.cache.ttl=30 -ssl-client.xml: |- - - - -ssl-server.xml: |- - - - -wait-for-namenodes.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/wait-for-namenodes/wait-for-namenodes.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2 new file mode 100644 index 00000000..aecd285e --- /dev/null +++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-datanode-default.yaml.j2 @@ -0,0 +1,785 @@ +core-site.xml: |- + + + + fs.defaultFS + hdfs://hdfs/ + + + ha.zookeeper.quorum + ${env.ZOOKEEPER} + + + hadoop.prometheus.endpoint.enabled + true + + + io.file.buffer.size + 131072 + + +hadoop-policy.xml: |- + + + +hdfs-site.xml: |- + + + + dfs.client.failover.proxy.provider.hdfs + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + dfs.datanode.data.dir + $DATANODE_DATA_DIR + + + dfs.datanode.handler.count + 50 + + + dfs.datanode.max.transfer.threads + 8192 + + + dfs.datanode.registered.hostname + ${env.POD_ADDRESS} + + + dfs.datanode.registered.http.port + ${env.HTTP_PORT} + + + dfs.datanode.registered.ipc.port + ${env.IPC_PORT} + + + dfs.datanode.registered.port + ${env.DATA_PORT} + + + dfs.datanode.sync.behind.writes + true + + + dfs.datanode.synconclose + true + + + dfs.ha.automatic-failover.enabled + true + + + dfs.ha.fencing.methods + shell(/bin/true) + + + dfs.ha.namenode.id + ${env.POD_NAME} + + + dfs.ha.namenodes.hdfs + hdfs-namenode-default-0,hdfs-namenode-default-1 + + + dfs.journalnode.edits.dir + /stackable/data/journalnode + + + dfs.namenode.datanode.registration.unsafe.allow-address-override + true + + + dfs.namenode.handler.count + 50 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.name.dir + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 + /stackable/data/namenode + + + dfs.namenode.replication.max-streams + 4 + + + dfs.namenode.replication.max-streams-hard-limit + 8 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.shared.edits.dir + qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs + + + dfs.nameservices + hdfs + + + dfs.replication + 1 + + +hdfs.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + +security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 +ssl-client.xml: |- + + + +ssl-server.xml: |- + + + +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +vector.yaml: | + data_dir: /stackable/log/_vector-state + + log_schema: + host_key: pod + + sources: + vector: + type: internal_logs + + files_stdout: + type: file + include: + - /stackable/log/*/*.stdout.log + + files_stderr: + type: file + include: + - /stackable/log/*/*.stderr.log + + files_log4j: + type: file + include: + - /stackable/log/*/*.log4j.xml + line_delimiter: "\r\n" + multiline: + mode: halt_before + start_pattern: ^" + raw_message + "" + parsed_event, err = parse_xml(wrapped_xml_event) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + root = object!(parsed_event.root) + if !is_object(root.event) { + error = "Parsed event contains no \"event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if keys(root) != ["event"] { + .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", ")) + } + event = object!(root.event) + + epoch_milliseconds, err = to_int(event.@timestamp) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "Time not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.@logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + message, err = string(event.message) + if err != null || is_empty(message) { + .errors = push(.errors, "Message not found.") + } + throwable = string(event.throwable) ?? "" + .message = join!(compact([message, throwable]), "\n") + } + } + + processed_files_log4j2: + inputs: + - files_log4j2 + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + event = {} + parsed_event, err = parse_xml(raw_message) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if !is_object(parsed_event.Event) { + error = "Parsed event contains no \"Event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event.Event) + + tag_instant_valid = false + instant, err = object(event.Instant) + if err == null { + epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond) + if err == null && epoch_nanoseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds") + if err == null { + .timestamp = converted_timestamp + tag_instant_valid = true + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } + if !tag_instant_valid { + epoch_milliseconds, err = to_int(event.@timeMillis) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } + + .logger, err = string(event.@loggerName) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + exception = null + thrown = event.Thrown + if is_object(thrown) { + exception = "Exception" + thread, err = string(event.@thread) + if err == null && !is_empty(thread) { + exception = exception + " in thread \"" + thread + "\"" + } + thrown_name, err = string(thrown.@name) + if err == null && !is_empty(exception) { + exception = exception + " " + thrown_name + } + message = string(thrown.@localizedMessage) ?? + string(thrown.@message) ?? + "" + if !is_empty(message) { + exception = exception + ": " + message + } + stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? [] + stacktrace = "" + for_each(stacktrace_items) -> |_index, value| { + stacktrace = stacktrace + " " + class = string(value.@class) ?? "" + method = string(value.@method) ?? "" + if !is_empty(class) && !is_empty(method) { + stacktrace = stacktrace + "at " + class + "." + method + } + file = string(value.@file) ?? "" + line = string(value.@line) ?? "" + if !is_empty(file) && !is_empty(line) { + stacktrace = stacktrace + "(" + file + ":" + line + ")" + } + exact = to_bool(value.@exact) ?? false + location = string(value.@location) ?? "" + version = string(value.@version) ?? "" + if !is_empty(location) && !is_empty(version) { + stacktrace = stacktrace + " " + if !exact { + stacktrace = stacktrace + "~" + } + stacktrace = stacktrace + "[" + location + ":" + version + "]" + } + stacktrace = stacktrace + "\n" + } + if stacktrace != "" { + exception = exception + "\n" + stacktrace + } + } + + message, err = string(event.Message) + if err != null || is_empty(message) { + message = null + .errors = push(.errors, "Message not found.") + } + .message = join!(compact([message, exception]), "\n") + } + } + + processed_files_py: + inputs: + - files_py + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + asctime, err = string(event.asctime) + if err == null { + parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.name) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.levelname) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if level == "DEBUG" { + .level = "DEBUG" + } else if level == "INFO" { + .level = "INFO" + } else if level == "WARNING" { + .level = "WARN" + } else if level == "ERROR" { + .level = "ERROR" + } else if level == "CRITICAL" { + .level = "FATAL" + } else { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + } + + processed_files_airlift: + inputs: + - files_airlift + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + timestamp_string, err = string(event.timestamp) + if err == null { + parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + .thread = string(parsed_event.thread) ?? null + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + stacktrace = string(event.stackTrace) ?? "" + .message = join!(compact([.message, stacktrace]), "\n\n") + } + + extended_logs_files: + inputs: + - processed_files_* + type: remap + source: | + del(.source_type) + if .errors == [] { + del(.errors) + } + . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$') + + filtered_logs_vector: + inputs: + - vector + type: filter + condition: '!includes(["TRACE", "DEBUG"], .metadata.level)' + + extended_logs_vector: + inputs: + - filtered_logs_vector + type: remap + source: | + .container = "vector" + .level = .metadata.level + .logger = .metadata.module_path + if exists(.file) { .processed_file = del(.file) } + del(.metadata) + del(.pid) + del(.source_type) + + extended_logs: + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "$NAMESPACE" + .cluster = "hdfs" + .role = "datanode" + .roleGroup = "default" + + sinks: + aggregator: + inputs: + - extended_logs + type: vector + address: $VECTOR_AGGREGATOR_ADDRESS +{% endif -%} +wait-for-namenodes.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/wait-for-namenodes/wait-for-namenodes.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml deleted file mode 100644 index 8600ee57..00000000 --- a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml +++ /dev/null @@ -1,167 +0,0 @@ ---- -core-site.xml: |- - - - - fs.defaultFS - hdfs://hdfs/ - - - ha.zookeeper.quorum - ${env.ZOOKEEPER} - - - hadoop.prometheus.endpoint.enabled - true - - - io.file.buffer.size - 131072 - - -hadoop-policy.xml: |- - - - -hdfs-site.xml: |- - - - - dfs.client.failover.proxy.provider.hdfs - org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - - - dfs.datanode.handler.count - 50 - - - dfs.datanode.max.transfer.threads - 8192 - - - dfs.datanode.registered.hostname - ${env.POD_ADDRESS} - - - dfs.datanode.registered.http.port - ${env.HTTP_PORT} - - - dfs.datanode.registered.ipc.port - ${env.IPC_PORT} - - - dfs.datanode.registered.port - ${env.DATA_PORT} - - - dfs.datanode.sync.behind.writes - true - - - dfs.datanode.synconclose - true - - - dfs.ha.automatic-failover.enabled - true - - - dfs.ha.fencing.methods - shell(/bin/true) - - - dfs.ha.namenode.id - ${env.POD_NAME} - - - dfs.ha.namenodes.hdfs - hdfs-namenode-default-0,hdfs-namenode-default-1 - - - dfs.journalnode.edits.dir - /stackable/data/journalnode - - - dfs.namenode.datanode.registration.unsafe.allow-address-override - true - - - dfs.namenode.handler.count - 50 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.name.dir - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 - /stackable/data/namenode - - - dfs.namenode.replication.max-streams - 4 - - - dfs.namenode.replication.max-streams-hard-limit - 8 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.shared.edits.dir - qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs - - - dfs.nameservices - hdfs - - - dfs.replication - 1 - - -hdfs.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout - -security.properties: | - networkaddress.cache.negative.ttl=0 - networkaddress.cache.ttl=30 -ssl-client.xml: |- - - - -ssl-server.xml: |- - - - diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2 new file mode 100644 index 00000000..fdc777f8 --- /dev/null +++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-journalnode-default.yaml.j2 @@ -0,0 +1,767 @@ +core-site.xml: |- + + + + fs.defaultFS + hdfs://hdfs/ + + + ha.zookeeper.quorum + ${env.ZOOKEEPER} + + + hadoop.prometheus.endpoint.enabled + true + + + io.file.buffer.size + 131072 + + +hadoop-policy.xml: |- + + + +hdfs-site.xml: |- + + + + dfs.client.failover.proxy.provider.hdfs + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + dfs.datanode.handler.count + 50 + + + dfs.datanode.max.transfer.threads + 8192 + + + dfs.datanode.registered.hostname + ${env.POD_ADDRESS} + + + dfs.datanode.registered.http.port + ${env.HTTP_PORT} + + + dfs.datanode.registered.ipc.port + ${env.IPC_PORT} + + + dfs.datanode.registered.port + ${env.DATA_PORT} + + + dfs.datanode.sync.behind.writes + true + + + dfs.datanode.synconclose + true + + + dfs.ha.automatic-failover.enabled + true + + + dfs.ha.fencing.methods + shell(/bin/true) + + + dfs.ha.namenode.id + ${env.POD_NAME} + + + dfs.ha.namenodes.hdfs + hdfs-namenode-default-0,hdfs-namenode-default-1 + + + dfs.journalnode.edits.dir + /stackable/data/journalnode + + + dfs.namenode.datanode.registration.unsafe.allow-address-override + true + + + dfs.namenode.handler.count + 50 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.name.dir + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 + /stackable/data/namenode + + + dfs.namenode.replication.max-streams + 4 + + + dfs.namenode.replication.max-streams-hard-limit + 8 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.shared.edits.dir + qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs + + + dfs.nameservices + hdfs + + + dfs.replication + 1 + + +hdfs.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + +security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 +ssl-client.xml: |- + + + +ssl-server.xml: |- + + + +{%- if lookup('env', 'VECTOR_AGGREGATOR') %} + +vector.yaml: | + data_dir: /stackable/log/_vector-state + + log_schema: + host_key: pod + + sources: + vector: + type: internal_logs + + files_stdout: + type: file + include: + - /stackable/log/*/*.stdout.log + + files_stderr: + type: file + include: + - /stackable/log/*/*.stderr.log + + files_log4j: + type: file + include: + - /stackable/log/*/*.log4j.xml + line_delimiter: "\r\n" + multiline: + mode: halt_before + start_pattern: ^" + raw_message + "" + parsed_event, err = parse_xml(wrapped_xml_event) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + root = object!(parsed_event.root) + if !is_object(root.event) { + error = "Parsed event contains no \"event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if keys(root) != ["event"] { + .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", ")) + } + event = object!(root.event) + + epoch_milliseconds, err = to_int(event.@timestamp) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "Time not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.@logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + message, err = string(event.message) + if err != null || is_empty(message) { + .errors = push(.errors, "Message not found.") + } + throwable = string(event.throwable) ?? "" + .message = join!(compact([message, throwable]), "\n") + } + } + + processed_files_log4j2: + inputs: + - files_log4j2 + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + event = {} + parsed_event, err = parse_xml(raw_message) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if !is_object(parsed_event.Event) { + error = "Parsed event contains no \"Event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event.Event) + + tag_instant_valid = false + instant, err = object(event.Instant) + if err == null { + epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond) + if err == null && epoch_nanoseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds") + if err == null { + .timestamp = converted_timestamp + tag_instant_valid = true + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } + if !tag_instant_valid { + epoch_milliseconds, err = to_int(event.@timeMillis) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } + + .logger, err = string(event.@loggerName) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + exception = null + thrown = event.Thrown + if is_object(thrown) { + exception = "Exception" + thread, err = string(event.@thread) + if err == null && !is_empty(thread) { + exception = exception + " in thread \"" + thread + "\"" + } + thrown_name, err = string(thrown.@name) + if err == null && !is_empty(exception) { + exception = exception + " " + thrown_name + } + message = string(thrown.@localizedMessage) ?? + string(thrown.@message) ?? + "" + if !is_empty(message) { + exception = exception + ": " + message + } + stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? [] + stacktrace = "" + for_each(stacktrace_items) -> |_index, value| { + stacktrace = stacktrace + " " + class = string(value.@class) ?? "" + method = string(value.@method) ?? "" + if !is_empty(class) && !is_empty(method) { + stacktrace = stacktrace + "at " + class + "." + method + } + file = string(value.@file) ?? "" + line = string(value.@line) ?? "" + if !is_empty(file) && !is_empty(line) { + stacktrace = stacktrace + "(" + file + ":" + line + ")" + } + exact = to_bool(value.@exact) ?? false + location = string(value.@location) ?? "" + version = string(value.@version) ?? "" + if !is_empty(location) && !is_empty(version) { + stacktrace = stacktrace + " " + if !exact { + stacktrace = stacktrace + "~" + } + stacktrace = stacktrace + "[" + location + ":" + version + "]" + } + stacktrace = stacktrace + "\n" + } + if stacktrace != "" { + exception = exception + "\n" + stacktrace + } + } + + message, err = string(event.Message) + if err != null || is_empty(message) { + message = null + .errors = push(.errors, "Message not found.") + } + .message = join!(compact([message, exception]), "\n") + } + } + + processed_files_py: + inputs: + - files_py + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + asctime, err = string(event.asctime) + if err == null { + parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.name) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.levelname) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if level == "DEBUG" { + .level = "DEBUG" + } else if level == "INFO" { + .level = "INFO" + } else if level == "WARNING" { + .level = "WARN" + } else if level == "ERROR" { + .level = "ERROR" + } else if level == "CRITICAL" { + .level = "FATAL" + } else { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + } + + processed_files_airlift: + inputs: + - files_airlift + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + timestamp_string, err = string(event.timestamp) + if err == null { + parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + .thread = string(parsed_event.thread) ?? null + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + stacktrace = string(event.stackTrace) ?? "" + .message = join!(compact([.message, stacktrace]), "\n\n") + } + + extended_logs_files: + inputs: + - processed_files_* + type: remap + source: | + del(.source_type) + if .errors == [] { + del(.errors) + } + . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$') + + filtered_logs_vector: + inputs: + - vector + type: filter + condition: '!includes(["TRACE", "DEBUG"], .metadata.level)' + + extended_logs_vector: + inputs: + - filtered_logs_vector + type: remap + source: | + .container = "vector" + .level = .metadata.level + .logger = .metadata.module_path + if exists(.file) { .processed_file = del(.file) } + del(.metadata) + del(.pid) + del(.source_type) + + extended_logs: + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "$NAMESPACE" + .cluster = "hdfs" + .role = "journalnode" + .roleGroup = "default" + + sinks: + aggregator: + inputs: + - extended_logs + type: vector + address: $VECTOR_AGGREGATOR_ADDRESS +{%- endif -%} diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml deleted file mode 100644 index ffb249ae..00000000 --- a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml +++ /dev/null @@ -1,211 +0,0 @@ ---- -core-site.xml: |- - - - - fs.defaultFS - hdfs://hdfs/ - - - ha.zookeeper.quorum - ${env.ZOOKEEPER} - - - hadoop.prometheus.endpoint.enabled - true - - - io.file.buffer.size - 131072 - - -format-namenodes.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/format-namenodes/format-namenodes.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout - -format-zookeeper.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/format-zookeeper/format-zookeeper.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout - -hadoop-policy.xml: |- - - - -hdfs-site.xml: |- - - - - dfs.client.failover.proxy.provider.hdfs - org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - - - dfs.datanode.handler.count - 50 - - - dfs.datanode.max.transfer.threads - 8192 - - - dfs.datanode.registered.hostname - ${env.POD_ADDRESS} - - - dfs.datanode.registered.http.port - ${env.HTTP_PORT} - - - dfs.datanode.registered.ipc.port - ${env.IPC_PORT} - - - dfs.datanode.registered.port - ${env.DATA_PORT} - - - dfs.datanode.sync.behind.writes - true - - - dfs.datanode.synconclose - true - - - dfs.ha.automatic-failover.enabled - true - - - dfs.ha.fencing.methods - shell(/bin/true) - - - dfs.ha.namenode.id - ${env.POD_NAME} - - - dfs.ha.namenodes.hdfs - hdfs-namenode-default-0,hdfs-namenode-default-1 - - - dfs.journalnode.edits.dir - /stackable/data/journalnode - - - dfs.namenode.datanode.registration.unsafe.allow-address-override - true - - - dfs.namenode.handler.count - 50 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 - - - dfs.namenode.name.dir - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 - /stackable/data/namenode - - - dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 - /stackable/data/namenode - - - dfs.namenode.replication.max-streams - 4 - - - dfs.namenode.replication.max-streams-hard-limit - 8 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 - hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 - hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 - - - dfs.namenode.shared.edits.dir - qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs - - - dfs.nameservices - hdfs - - - dfs.replication - 1 - - -hdfs.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout - -security.properties: | - networkaddress.cache.negative.ttl=0 - networkaddress.cache.ttl=30 -ssl-client.xml: |- - - - -ssl-server.xml: |- - - - -zkfc.log4j.properties: |+ - log4j.rootLogger=INFO, CONSOLE, FILE - - log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender - log4j.appender.CONSOLE.Threshold=INFO - log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout - log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - log4j.appender.FILE=org.apache.log4j.RollingFileAppender - log4j.appender.FILE.Threshold=INFO - log4j.appender.FILE.File=/stackable/log/zkfc/zkfc.log4j.xml - log4j.appender.FILE.MaxFileSize=5MB - log4j.appender.FILE.MaxBackupIndex=1 - log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout diff --git a/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2 b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2 new file mode 100644 index 00000000..03071e51 --- /dev/null +++ b/tests/templates/kuttl/smoke/31_configmap_hdfs-namenode-default.yaml.j2 @@ -0,0 +1,811 @@ +core-site.xml: |- + + + + fs.defaultFS + hdfs://hdfs/ + + + ha.zookeeper.quorum + ${env.ZOOKEEPER} + + + hadoop.prometheus.endpoint.enabled + true + + + io.file.buffer.size + 131072 + + +format-namenodes.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/format-namenodes/format-namenodes.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + +format-zookeeper.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/format-zookeeper/format-zookeeper.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + +hadoop-policy.xml: |- + + + +hdfs-site.xml: |- + + + + dfs.client.failover.proxy.provider.hdfs + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + dfs.datanode.handler.count + 50 + + + dfs.datanode.max.transfer.threads + 8192 + + + dfs.datanode.registered.hostname + ${env.POD_ADDRESS} + + + dfs.datanode.registered.http.port + ${env.HTTP_PORT} + + + dfs.datanode.registered.ipc.port + ${env.IPC_PORT} + + + dfs.datanode.registered.port + ${env.DATA_PORT} + + + dfs.datanode.sync.behind.writes + true + + + dfs.datanode.synconclose + true + + + dfs.ha.automatic-failover.enabled + true + + + dfs.ha.fencing.methods + shell(/bin/true) + + + dfs.ha.namenode.id + ${env.POD_NAME} + + + dfs.ha.namenodes.hdfs + hdfs-namenode-default-0,hdfs-namenode-default-1 + + + dfs.journalnode.edits.dir + /stackable/data/journalnode + + + dfs.namenode.datanode.registration.unsafe.allow-address-override + true + + + dfs.namenode.handler.count + 50 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.http-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:9870 + + + dfs.namenode.name.dir + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-0 + /stackable/data/namenode + + + dfs.namenode.name.dir.hdfs.hdfs-namenode-default-1 + /stackable/data/namenode + + + dfs.namenode.replication.max-streams + 4 + + + dfs.namenode.replication.max-streams-hard-limit + 8 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-0 + hdfs-namenode-default-0.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.rpc-address.hdfs.hdfs-namenode-default-1 + hdfs-namenode-default-1.hdfs-namenode-default.$NAMESPACE.svc.cluster.local:8020 + + + dfs.namenode.shared.edits.dir + qjournal://hdfs-journalnode-default-0.hdfs-journalnode-default.$NAMESPACE.svc.cluster.local:8485/hdfs + + + dfs.nameservices + hdfs + + + dfs.replication + 1 + + +hdfs.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/hdfs/hdfs.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + +security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 +ssl-client.xml: |- + + + +ssl-server.xml: |- + + + +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +vector.yaml: | + data_dir: /stackable/log/_vector-state + + log_schema: + host_key: pod + + sources: + vector: + type: internal_logs + + files_stdout: + type: file + include: + - /stackable/log/*/*.stdout.log + + files_stderr: + type: file + include: + - /stackable/log/*/*.stderr.log + + files_log4j: + type: file + include: + - /stackable/log/*/*.log4j.xml + line_delimiter: "\r\n" + multiline: + mode: halt_before + start_pattern: ^" + raw_message + "" + parsed_event, err = parse_xml(wrapped_xml_event) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + root = object!(parsed_event.root) + if !is_object(root.event) { + error = "Parsed event contains no \"event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if keys(root) != ["event"] { + .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", ")) + } + event = object!(root.event) + + epoch_milliseconds, err = to_int(event.@timestamp) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "Time not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.@logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + message, err = string(event.message) + if err != null || is_empty(message) { + .errors = push(.errors, "Message not found.") + } + throwable = string(event.throwable) ?? "" + .message = join!(compact([message, throwable]), "\n") + } + } + + processed_files_log4j2: + inputs: + - files_log4j2 + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + event = {} + parsed_event, err = parse_xml(raw_message) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if !is_object(parsed_event.Event) { + error = "Parsed event contains no \"Event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event.Event) + + tag_instant_valid = false + instant, err = object(event.Instant) + if err == null { + epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond) + if err == null && epoch_nanoseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds") + if err == null { + .timestamp = converted_timestamp + tag_instant_valid = true + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } + if !tag_instant_valid { + epoch_milliseconds, err = to_int(event.@timeMillis) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } + + .logger, err = string(event.@loggerName) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + exception = null + thrown = event.Thrown + if is_object(thrown) { + exception = "Exception" + thread, err = string(event.@thread) + if err == null && !is_empty(thread) { + exception = exception + " in thread \"" + thread + "\"" + } + thrown_name, err = string(thrown.@name) + if err == null && !is_empty(exception) { + exception = exception + " " + thrown_name + } + message = string(thrown.@localizedMessage) ?? + string(thrown.@message) ?? + "" + if !is_empty(message) { + exception = exception + ": " + message + } + stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? [] + stacktrace = "" + for_each(stacktrace_items) -> |_index, value| { + stacktrace = stacktrace + " " + class = string(value.@class) ?? "" + method = string(value.@method) ?? "" + if !is_empty(class) && !is_empty(method) { + stacktrace = stacktrace + "at " + class + "." + method + } + file = string(value.@file) ?? "" + line = string(value.@line) ?? "" + if !is_empty(file) && !is_empty(line) { + stacktrace = stacktrace + "(" + file + ":" + line + ")" + } + exact = to_bool(value.@exact) ?? false + location = string(value.@location) ?? "" + version = string(value.@version) ?? "" + if !is_empty(location) && !is_empty(version) { + stacktrace = stacktrace + " " + if !exact { + stacktrace = stacktrace + "~" + } + stacktrace = stacktrace + "[" + location + ":" + version + "]" + } + stacktrace = stacktrace + "\n" + } + if stacktrace != "" { + exception = exception + "\n" + stacktrace + } + } + + message, err = string(event.Message) + if err != null || is_empty(message) { + message = null + .errors = push(.errors, "Message not found.") + } + .message = join!(compact([message, exception]), "\n") + } + } + + processed_files_py: + inputs: + - files_py + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + asctime, err = string(event.asctime) + if err == null { + parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.name) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.levelname) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if level == "DEBUG" { + .level = "DEBUG" + } else if level == "INFO" { + .level = "INFO" + } else if level == "WARNING" { + .level = "WARN" + } else if level == "ERROR" { + .level = "ERROR" + } else if level == "CRITICAL" { + .level = "FATAL" + } else { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + } + + processed_files_airlift: + inputs: + - files_airlift + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + timestamp_string, err = string(event.timestamp) + if err == null { + parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + .thread = string(parsed_event.thread) ?? null + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + stacktrace = string(event.stackTrace) ?? "" + .message = join!(compact([.message, stacktrace]), "\n\n") + } + + extended_logs_files: + inputs: + - processed_files_* + type: remap + source: | + del(.source_type) + if .errors == [] { + del(.errors) + } + . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$') + + filtered_logs_vector: + inputs: + - vector + type: filter + condition: '!includes(["TRACE", "DEBUG"], .metadata.level)' + + extended_logs_vector: + inputs: + - filtered_logs_vector + type: remap + source: | + .container = "vector" + .level = .metadata.level + .logger = .metadata.module_path + if exists(.file) { .processed_file = del(.file) } + del(.metadata) + del(.pid) + del(.source_type) + + extended_logs: + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "$NAMESPACE" + .cluster = "hdfs" + .role = "namenode" + .roleGroup = "default" + + sinks: + aggregator: + inputs: + - extended_logs + type: vector + address: $VECTOR_AGGREGATOR_ADDRESS +{% endif -%} +zkfc.log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/zkfc/zkfc.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout +