From 7c9a8717769ac7dbee6986a747f4589d19b123d0 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 26 May 2026 15:18:35 +0200 Subject: [PATCH 1/2] test(smoke_v2): Fix assertions --- .../kuttl/smoke_v2/35-assert.yaml.j2 | 600 ++++++++++++++++++ 1 file changed, 600 insertions(+) diff --git a/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 b/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 index 5e00ea70..e5a1070c 100644 --- a/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 @@ -263,6 +263,606 @@ commands: {% endif %} +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vector.yaml: | + data_dir: /stackable/log/_vector-state + + log_schema: + host_key: pod + + sources: + vector: + type: internal_logs + + files_stdout: + type: file + include: + - /stackable/log/*/*.stdout.log + + files_stderr: + type: file + include: + - /stackable/log/*/*.stderr.log + + files_log4j: + type: file + include: + - /stackable/log/*/*.log4j.xml + line_delimiter: "\r\n" + multiline: + mode: halt_before + start_pattern: ^" + raw_message + "" + parsed_event, err = parse_xml(wrapped_xml_event) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + root = object!(parsed_event.root) + if !is_object(root.event) { + error = "Parsed event contains no \"event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if keys(root) != ["event"] { + .errors = push(.errors, "Parsed event contains multiple tags: " + join!(keys(root), ", ")) + } + event = object!(root.event) + + epoch_milliseconds, err = to_int(event.@timestamp) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "Time not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.@logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + message, err = string(event.message) + if err != null || is_empty(message) { + .errors = push(.errors, "Message not found.") + } + throwable = string(event.throwable) ?? "" + .message = join!(compact([message, throwable]), "\n") + } + } + + processed_files_log4j2: + inputs: + - files_log4j2 + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + event = {} + parsed_event, err = parse_xml(raw_message) + if err != null { + error = "XML not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + if !is_object(parsed_event.Event) { + error = "Parsed event contains no \"Event\" tag." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event.Event) + + tag_instant_valid = false + instant, err = object(event.Instant) + if err == null { + epoch_nanoseconds, err = to_int(instant.@epochSecond) * 1_000_000_000 + to_int(instant.@nanoOfSecond) + if err == null && epoch_nanoseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_nanoseconds, "nanoseconds") + if err == null { + .timestamp = converted_timestamp + tag_instant_valid = true + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } else { + .errors = push(.errors, "Instant invalid, trying property timeMillis instead: " + err) + } + } + if !tag_instant_valid { + epoch_milliseconds, err = to_int(event.@timeMillis) + if err == null && epoch_milliseconds != 0 { + converted_timestamp, err = from_unix_timestamp(epoch_milliseconds, "milliseconds") + if err == null { + .timestamp = converted_timestamp + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "timeMillis not parsable, using current time instead: " + err) + } + } + + .logger, err = string(event.@loggerName) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.@level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + exception = null + thrown = event.Thrown + if is_object(thrown) { + exception = "Exception" + thread, err = string(event.@thread) + if err == null && !is_empty(thread) { + exception = exception + " in thread \"" + thread + "\"" + } + thrown_name, err = string(thrown.@name) + if err == null && !is_empty(exception) { + exception = exception + " " + thrown_name + } + message = string(thrown.@localizedMessage) ?? + string(thrown.@message) ?? + "" + if !is_empty(message) { + exception = exception + ": " + message + } + stacktrace_items = array(thrown.ExtendedStackTrace.ExtendedStackTraceItem) ?? [] + stacktrace = "" + for_each(stacktrace_items) -> |_index, value| { + stacktrace = stacktrace + " " + class = string(value.@class) ?? "" + method = string(value.@method) ?? "" + if !is_empty(class) && !is_empty(method) { + stacktrace = stacktrace + "at " + class + "." + method + } + file = string(value.@file) ?? "" + line = string(value.@line) ?? "" + if !is_empty(file) && !is_empty(line) { + stacktrace = stacktrace + "(" + file + ":" + line + ")" + } + exact = to_bool(value.@exact) ?? false + location = string(value.@location) ?? "" + version = string(value.@version) ?? "" + if !is_empty(location) && !is_empty(version) { + stacktrace = stacktrace + " " + if !exact { + stacktrace = stacktrace + "~" + } + stacktrace = stacktrace + "[" + location + ":" + version + "]" + } + stacktrace = stacktrace + "\n" + } + if stacktrace != "" { + exception = exception + "\n" + stacktrace + } + } + + message, err = string(event.Message) + if err != null || is_empty(message) { + message = null + .errors = push(.errors, "Message not found.") + } + .message = join!(compact([message, exception]), "\n") + } + } + + processed_files_py: + inputs: + - files_py + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + asctime, err = string(event.asctime) + if err == null { + parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.name) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.levelname) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if level == "DEBUG" { + .level = "DEBUG" + } else if level == "INFO" { + .level = "INFO" + } else if level == "WARNING" { + .level = "WARN" + } else if level == "ERROR" { + .level = "ERROR" + } else if level == "CRITICAL" { + .level = "FATAL" + } else { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + } + + processed_files_airlift: + inputs: + - files_airlift + type: remap + source: | + raw_message = string!(.message) + + .timestamp = now() + .logger = "" + .level = "INFO" + .message = "" + .errors = [] + + parsed_event, err = parse_json(raw_message) + if err != null { + error = "JSON not parsable: " + err + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else if !is_object(parsed_event) { + error = "Parsed event is not a JSON object." + .errors = push(.errors, error) + log(error, level: "warn") + .message = raw_message + } else { + event = object!(parsed_event) + + timestamp_string, err = string(event.timestamp) + if err == null { + parsed_timestamp, err = parse_timestamp(timestamp_string, "%Y-%m-%dT%H:%M:%S.%fZ") + if err == null { + .timestamp = parsed_timestamp + } else { + .errors = push(.errors, "Timestamp not parsable, using current time instead: " + err) + } + } else { + .errors = push(.errors, "Timestamp not found, using current time instead.") + } + + .logger, err = string(event.logger) + if err != null || is_empty(.logger) { + .errors = push(.errors, "Logger not found.") + } + + level, err = string(event.level) + if err != null { + .errors = push(.errors, "Level not found, using \"" + .level + "\" instead.") + } else if !includes(["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"], level) { + .errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.") + } else { + .level = level + } + + .thread = string(parsed_event.thread) ?? null + + .message, err = string(event.message) + if err != null || is_empty(.message) { + .errors = push(.errors, "Message not found.") + } + stacktrace = string(event.stackTrace) ?? "" + .message = join!(compact([.message, stacktrace]), "\n\n") + } + + extended_logs_files: + inputs: + - processed_files_* + type: remap + source: | + del(.source_type) + if .errors == [] { + del(.errors) + } + . |= parse_regex!(.file, r'^/stackable/log/(?P.*?)/(?P.*?)$') + + filtered_logs_vector: + inputs: + - vector + type: filter + condition: '!includes(["TRACE", "DEBUG"], .metadata.level)' + + extended_logs_vector: + inputs: + - filtered_logs_vector + type: remap + source: | + .container = "vector" + .level = .metadata.level + .logger = .metadata.module_path + if exists(.file) { .processed_file = del(.file) } + del(.metadata) + del(.pid) + del(.source_type) + + extended_logs: + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "kuttl-test1" + .cluster = "nifi" + .role = "node" + .roleGroup = "default" + + sinks: + aggregator: + inputs: + - extended_logs + type: vector + address: $VECTOR_AGGREGATOR_ADDRESS +{% endif %} YAMLEOF ) actual=$(kubectl -n $NAMESPACE get cm nifi-node-default -o yaml | yq -o=json '.data') From 749bfa2b6ba5bdf96361d62238eeebdc6e3a5415 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 26 May 2026 16:53:07 +0200 Subject: [PATCH 2/2] test(smoke_v2): Fix assertions --- tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 b/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 index e5a1070c..4d5e8ddd 100644 --- a/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke_v2/35-assert.yaml.j2 @@ -19,7 +19,7 @@ kind: TestAssert timeout: 60 commands: - script: | - expected=$(cat <<'YAMLEOF' | yq -o=json + expected=$(cat <<'YAMLEOF' | envsubst '$NAMESPACE' | yq -o=json authorizers.xml: | @@ -851,7 +851,7 @@ commands: - extended_logs_* type: remap source: | - .namespace = "kuttl-test1" + .namespace = "$NAMESPACE" .cluster = "nifi" .role = "node" .roleGroup = "default"