diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 9dbb1fd8963..5a20f9ecf5d 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -27,9 +27,7 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_directories::write_hotcache; use quickwit_doc_mapper::NamedField; use quickwit_doc_mapper::tag_pruning::append_to_tag_set; -use quickwit_proto::search::{ - ListFieldType, ListFields, ListFieldsEntryResponse, serialize_split_fields, -}; +use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata, ListFieldsType}; use tantivy::index::FieldMetadata; use tantivy::schema::{FieldType, Type}; use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta}; @@ -314,7 +312,7 @@ fn create_packaged_split( build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?; ctx.record_progress(); - let serialized_split_fields = serialize_field_metadata(&fields_metadata); + let serialized_split_fields = serialize_fields_metadata(&fields_metadata); let packaged_split = PackagedSplit { serialized_split_fields, @@ -327,37 +325,19 @@ fn create_packaged_split( Ok(packaged_split) } -/// Serializes the Split fields. -/// -/// `fields_metadata` has to be sorted. -fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec { - let fields = fields_metadata +/// Serializes the fields metadata from a split sorted by (name, type). +fn serialize_fields_metadata(fields_metadata: &[FieldMetadata]) -> Vec { + let entries = fields_metadata .iter() - .map(field_metadata_to_list_field_serialized) + .map(field_metadata_to_list_fields_entry) + .sorted_unstable_by(|left, right| left.cmp_by_name_and_type(right)) .collect::>(); - serialize_split_fields(ListFields { fields }) -} - -fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType { - match typ { - Type::Str => ListFieldType::Str, - Type::U64 => ListFieldType::U64, - Type::I64 => ListFieldType::I64, - Type::F64 => ListFieldType::F64, - Type::Bool => ListFieldType::Bool, - Type::Date => ListFieldType::Date, - Type::Facet => ListFieldType::Facet, - Type::Bytes => ListFieldType::Bytes, - Type::Json => ListFieldType::Json, - Type::IpAddr => ListFieldType::IpAddr, - } + ListFieldsMetadata { entries }.serialize() } -fn field_metadata_to_list_field_serialized( - field_metadata: &FieldMetadata, -) -> ListFieldsEntryResponse { - ListFieldsEntryResponse { +fn field_metadata_to_list_fields_entry(field_metadata: &FieldMetadata) -> ListFieldsEntry { + ListFieldsEntry { field_name: field_metadata.field_name.to_string(), field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32, searchable: field_metadata.is_indexed(), @@ -368,6 +348,21 @@ fn field_metadata_to_list_field_serialized( } } +fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldsType { + match typ { + Type::Bool => ListFieldsType::Bool, + Type::Bytes => ListFieldsType::Bytes, + Type::Date => ListFieldsType::Date, + Type::F64 => ListFieldsType::F64, + Type::Facet => ListFieldsType::Facet, + Type::I64 => ListFieldsType::I64, + Type::IpAddr => ListFieldsType::IpAddr, + Type::Json => ListFieldsType::Json, + Type::Str => ListFieldsType::Str, + Type::U64 => ListFieldsType::U64, + } +} + /// Reads u64 from stored term data. fn u64_from_term_data(data: &[u8]) -> anyhow::Result { let u64_bytes: [u8; 8] = data[0..8] @@ -382,7 +377,7 @@ mod tests { use quickwit_actors::{ObservationType, Universe}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; - use quickwit_proto::search::{ListFieldsEntryResponse, deserialize_split_fields}; + use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata}; use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId}; use tantivy::directory::MmapDirectory; use tantivy::schema::{FAST, NumericOptions, STRING, Schema, TEXT, Type}; @@ -424,24 +419,24 @@ mod tests { }, ]; - let out = serialize_field_metadata(&fields_metadata); + let out = serialize_fields_metadata(&fields_metadata); - let deserialized: Vec = - deserialize_split_fields(&mut &out[..]).unwrap().fields; + let deserialized: Vec = + ListFieldsMetadata::deserialize(&out[..]).unwrap().entries; assert_eq!(fields_metadata.len(), deserialized.len()); assert_eq!(deserialized[0].field_name, "test"); - assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32); + assert_eq!(deserialized[0].field_type, ListFieldsType::Str as i32); assert!(deserialized[0].searchable); assert!(deserialized[0].aggregatable); assert_eq!(deserialized[1].field_name, "test2"); - assert_eq!(deserialized[1].field_type, ListFieldType::Str as i32); + assert_eq!(deserialized[1].field_type, ListFieldsType::Str as i32); assert!(deserialized[1].searchable); assert!(!deserialized[1].aggregatable); assert_eq!(deserialized[2].field_name, "test3"); - assert_eq!(deserialized[2].field_type, ListFieldType::U64 as i32); + assert_eq!(deserialized[2].field_type, ListFieldsType::U64 as i32); assert!(deserialized[2].searchable); assert!(deserialized[2].aggregatable); } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 73396bbf982..7bedc6a7473 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -117,7 +117,7 @@ message ListFieldsRequest { repeated string index_id_patterns = 1; // Optional limit query to a list of fields // Wildcard expressions are supported. - repeated string fields = 2; + repeated string field_patterns = 2; // Time filter, expressed in seconds since epoch. // That filter is to be interpreted as the semi-open interval: @@ -144,16 +144,23 @@ message LeafListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. - repeated string fields = 4; + repeated string field_patterns = 4; } +/// Message returned by leaf and root list fields requests. message ListFieldsResponse { - repeated ListFieldsEntryResponse fields = 1; + repeated ListFieldsEntry entries = 1; } -message ListFieldsEntryResponse { +/// Message containing the fields metadata for a split sorted by (name, type) and stored zstd-compressed in the split. Currently duplicate of ListFieldsResponse, but kept +/// distinct so they can evolve independently. +message ListFieldsMetadata { + repeated ListFieldsEntry entries = 1; +} + +message ListFieldsEntry { string field_name = 1; - ListFieldType field_type = 2; + ListFieldsType field_type = 2; // The index ids the field exists repeated string index_ids = 3; // True means the field is searchable (indexed) in at least some indices. @@ -168,7 +175,7 @@ message ListFieldsEntryResponse { repeated string non_aggregatable_index_ids = 7; } -enum ListFieldType { +enum ListFieldsType { STR = 0; U64 = 1; I64 = 2; @@ -180,9 +187,7 @@ enum ListFieldType { IP_ADDR = 8; JSON = 9; } -message ListFields { - repeated ListFieldsEntryResponse fields = 1; -} + // -- Search ------------------- message SearchRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index a284cef1513..df17e2b1095 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -62,7 +62,7 @@ pub struct ListFieldsRequest { /// Optional limit query to a list of fields /// Wildcard expressions are supported. #[prost(string, repeated, tag = "2")] - pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + pub field_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Time filter, expressed in seconds since epoch. /// That filter is to be interpreted as the semi-open interval: /// \[start_timestamp, end_timestamp). @@ -91,20 +91,29 @@ pub struct LeafListFieldsRequest { /// Optional limit query to a list of fields /// Wildcard expressions are supported. #[prost(string, repeated, tag = "4")] - pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + pub field_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +/// / Message returned by leaf and root list fields requests. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListFieldsResponse { #[prost(message, repeated, tag = "1")] - pub fields: ::prost::alloc::vec::Vec, + pub entries: ::prost::alloc::vec::Vec, +} +/// / Message containing the fields metadata for a split sorted by (name, type) and stored zstd-compressed in the split. Currently duplicate of ListFieldsResponse, but kept +/// / distinct so they can evolve independently. +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsMetadata { + #[prost(message, repeated, tag = "1")] + pub entries: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] -pub struct ListFieldsEntryResponse { +pub struct ListFieldsEntry { #[prost(string, tag = "1")] pub field_name: ::prost::alloc::string::String, - #[prost(enumeration = "ListFieldType", tag = "2")] + #[prost(enumeration = "ListFieldsType", tag = "2")] pub field_type: i32, /// The index ids the field exists #[prost(string, repeated, tag = "3")] @@ -129,12 +138,6 @@ pub struct ListFieldsEntryResponse { >, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListFields { - #[prost(message, repeated, tag = "1")] - pub fields: ::prost::alloc::vec::Vec, -} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Hash, Eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchRequest { @@ -739,7 +742,7 @@ pub struct LeafListTermsResponse { #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] -pub enum ListFieldType { +pub enum ListFieldsType { Str = 0, U64 = 1, I64 = 2, @@ -751,7 +754,7 @@ pub enum ListFieldType { IpAddr = 8, Json = 9, } -impl ListFieldType { +impl ListFieldsType { /// String value of the enum field names used in the ProtoBuf definition. /// /// The values are not transformed in any way and thus are considered stable diff --git a/quickwit/quickwit-proto/src/search/mod.rs b/quickwit/quickwit-proto/src/search/mod.rs index 86298db0f32..c1b32010220 100644 --- a/quickwit/quickwit-proto/src/search/mod.rs +++ b/quickwit/quickwit-proto/src/search/mod.rs @@ -229,48 +229,116 @@ impl PartialHit { } } -/// Serializes the Split fields. -/// -/// `fields_metadata` has to be sorted. -pub fn serialize_split_fields(list_fields: ListFields) -> Vec { - let payload = list_fields.encode_to_vec(); - let compression_level = 3; - let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) - .expect("zstd encoding failed"); - let mut out = Vec::new(); - // Write Header -- Format Version 2 - let format_version = 2u8; - out.push(format_version); - // Write Payload - out.extend_from_slice(&payload_compressed); - out -} +/// On-disk format version for serialized [`ListFieldsMetadata`]. Bumped whenever the wire format +/// produced by [`ListFieldsMetadata::serialize`] changes in a way readers can't tolerate. +const SPLIT_FIELDS_FORMAT_VERSION: u8 = 2; + +/// Zstd compression level used when writing split fields. +const SPLIT_FIELDS_COMPRESSION_LEVEL: i32 = 3; + +impl ListFieldsMetadata { + /// Serializes the entries: one version byte followed by the zstd-compressed protobuf + /// encoding of `Self`. + pub fn serialize(&self) -> Vec { + let payload = self.encode_to_vec(); + let mut out = vec![SPLIT_FIELDS_FORMAT_VERSION]; + zstd::stream::copy_encode(&payload[..], &mut out, SPLIT_FIELDS_COMPRESSION_LEVEL) + .expect("zstd encoding into `Vec` should not fail"); + out + } + + /// Reads the format produced by [`Self::serialize`]. + pub fn deserialize(mut reader: R) -> io::Result { + let mut version_byte = [0u8; 1]; + reader.read_exact(&mut version_byte)?; + + if version_byte[0] != SPLIT_FIELDS_FORMAT_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "unsupported split fields format version: {}", + version_byte[0] + ), + )); + } + let mut zstd_decoder = zstd::stream::read::Decoder::new(reader)?; + let mut decompressed = Vec::new(); + zstd_decoder.read_to_end(&mut decompressed)?; -/// Reads a fixed number of bytes into an array and returns the array. -fn read_exact_array(reader: &mut impl Read) -> io::Result<[u8; N]> { - let mut buffer = [0u8; N]; - reader.read_exact(&mut buffer)?; - Ok(buffer) + Self::decode(&decompressed[..]) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) + } } -/// Reads the Split fields from a zstd compressed stream of bytes -pub fn deserialize_split_fields(mut reader: R) -> io::Result { - let format_version = read_exact_array::<1>(&mut reader)?[0]; - if format_version != 2 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Unsupported split field format version: {format_version}"), - )); +impl ListFieldsEntry { + pub fn cmp_by_name_and_type(&self, other: &Self) -> Ordering { + self.field_name + .cmp(&other.field_name) + .then_with(|| self.field_type.cmp(&other.field_type)) } - let reader = zstd::Decoder::new(reader)?; - read_split_fields_from_zstd(reader) } -/// Reads the Split fields from a stream of bytes -#[allow(clippy::unbuffered_bytes)] -fn read_split_fields_from_zstd(reader: R) -> io::Result { - let all_bytes: Vec<_> = reader.bytes().collect::>()?; - let serialized_list_fields: ListFields = prost::Message::decode(&all_bytes[..])?; +#[cfg(test)] +mod tests { + use super::*; + + fn entry(field_name: &str) -> ListFieldsEntry { + ListFieldsEntry { + field_name: field_name.to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + index_ids: vec!["index-1".to_string()], + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + } + } - Ok(serialized_list_fields) + #[test] + fn list_fields_entries_roundtrip() { + let entries = ListFieldsMetadata { + entries: vec![entry("a"), entry("b"), entry("c")], + }; + let buf = entries.serialize(); + let decoded = ListFieldsMetadata::deserialize(&buf[..]).unwrap(); + assert_eq!(decoded, entries); + } + + #[test] + fn list_fields_entries_empty_roundtrip() { + let entries = ListFieldsMetadata { + entries: Vec::new(), + }; + let buf = entries.serialize(); + // Just the version byte plus an (essentially empty) zstd frame. + assert_eq!(buf[0], SPLIT_FIELDS_FORMAT_VERSION); + let decoded = ListFieldsMetadata::deserialize(&buf[..]).unwrap(); + assert_eq!(decoded, entries); + } + + #[test] + fn list_fields_entries_wire_compatible_with_encode() { + // `serialize` must produce the same compressed payload as a one-shot `encode_to_vec`, + // so existing readers / on-disk snapshots stay compatible. + let entries = ListFieldsMetadata { + entries: vec![entry("a"), entry("b")], + }; + let actual = entries.serialize(); + + let one_shot_encoded = entries.encode_to_vec(); + let mut expected = vec![SPLIT_FIELDS_FORMAT_VERSION]; + let compressed = + zstd::stream::encode_all(&one_shot_encoded[..], SPLIT_FIELDS_COMPRESSION_LEVEL) + .unwrap(); + expected.extend_from_slice(&compressed); + + assert_eq!(actual, expected); + } + + #[test] + fn list_fields_entries_rejects_unknown_version() { + let buf = [0xFFu8]; + let err = ListFieldsMetadata::deserialize(&buf[..]).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index d4347afcc6c..4ef63430b56 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -29,7 +29,6 @@ mod invoker; pub mod leaf; mod leaf_cache; mod list_fields; -mod list_fields_cache; mod list_terms; mod metrics_trackers; mod retry; diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs deleted file mode 100644 index b5974867cfd..00000000000 --- a/quickwit/quickwit-search/src/list_fields.rs +++ /dev/null @@ -1,847 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, HashSet}; -use std::path::Path; -use std::str::FromStr; -use std::sync::{Arc, LazyLock}; - -use anyhow::Context; -use futures::future; -use futures::future::try_join_all; -use itertools::Itertools; -use quickwit_common::rate_limited_warn; -use quickwit_common::shared_consts::{FIELD_PRESENCE_FIELD_NAME, SPLIT_FIELDS_FILE_NAME}; -use quickwit_common::uri::Uri; -use quickwit_config::build_doc_mapper; -use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; -use quickwit_metastore::SplitMetadata; -use quickwit_proto::metastore::MetastoreServiceClient; -use quickwit_proto::search::{ - LeafListFieldsRequest, ListFields, ListFieldsEntryResponse, ListFieldsRequest, - ListFieldsResponse, SplitIdAndFooterOffsets, deserialize_split_fields, -}; -use quickwit_proto::types::{IndexId, IndexUid}; -use quickwit_query::query_ast::QueryAst; -use quickwit_storage::Storage; - -use crate::leaf::open_split_bundle; -use crate::search_job_placer::group_jobs_by_index_id; -use crate::service::SearcherContext; -use crate::{ - ClusterClient, SearchError, SearchJob, list_relevant_splits, resolve_index_patterns, - search_thread_pool, -}; - -/// QW_FIELD_LIST_SIZE_LIMIT defines a hard limit on the number of fields that -/// can be returned (error otherwise). -/// -/// Having many fields can happen when a user is creating fields dynamically in -/// a JSON type with random field names. This leads to huge memory consumption -/// when building the response. This is a workaround until a way is found to -/// prune the long tail of rare fields. -static FIELD_LIST_SIZE_LIMIT: LazyLock = - LazyLock::new(|| quickwit_common::get_from_env("QW_FIELD_LIST_SIZE_LIMIT", 100_000, false)); - -const DYNAMIC_FIELD_PREFIX: &str = "_dynamic."; - -/// Get the list of fields in the given split. -/// The returned list is guaranteed to be strictly sorted by (field_name, field_type). -async fn get_fields_from_split( - searcher_context: &SearcherContext, - index_id: IndexId, - split_and_footer_offsets: &SplitIdAndFooterOffsets, - index_storage: Arc, -) -> anyhow::Result> { - if let Some(list_fields) = searcher_context - .list_fields_cache - .get(split_and_footer_offsets.clone()) - { - return Ok(list_fields.fields); - } - let (_, split_bundle) = - open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; - - let serialized_split_fields = split_bundle - .get_all(Path::new(SPLIT_FIELDS_FILE_NAME)) - .await?; - let serialized_split_fields_len = serialized_split_fields.len(); - let list_fields_proto = - deserialize_split_fields(serialized_split_fields).with_context(|| { - format!("could not read split fields (serialized len: {serialized_split_fields_len})",) - })?; - - let mut list_fields = list_fields_proto.fields; - list_fields.retain(|list_field_entry| list_field_entry.field_name != FIELD_PRESENCE_FIELD_NAME); - - for list_field_entry in list_fields.iter_mut() { - list_field_entry.index_ids = vec![index_id.to_string()]; - - if list_field_entry - .field_name - .starts_with(DYNAMIC_FIELD_PREFIX) - { - list_field_entry - .field_name - .replace_range(..DYNAMIC_FIELD_PREFIX.len(), ""); - } - } - - // We sort our fields, as the removal of dynamic_field prefix could have caused them to be out - // of order. We also defensively make sure there are no duplicates here. - make_sorted_and_dedup(&mut list_fields); - - // Put result into cache - searcher_context.list_fields_cache.put( - split_and_footer_offsets.clone(), - ListFields { - fields: list_fields.clone(), - }, - ); - - Ok(list_fields) -} - -fn field_order( - left: &ListFieldsEntryResponse, - right: &ListFieldsEntryResponse, -) -> std::cmp::Ordering { - left.field_name - .cmp(&right.field_name) - .then_with(|| left.field_type.cmp(&right.field_type)) -} - -// Sorts and deduplicates the list of fields. -// -// If somehow we end up with duplicate fields, only the first one is kept, -// and we log a warning. -fn make_sorted_and_dedup(list_fields: &mut Vec) { - list_fields.sort_unstable_by(field_order); - - // We defensively make sure there are no duplicates here. - list_fields.dedup_by(|left, right| { - if left.field_name == right.field_name && left.field_type == right.field_type { - rate_limited_warn!( - limit_per_min = 1, - left.field_name, - "duplicate fields found, please report" - ); - true - } else { - false - } - }); -} - -/// `current_group` needs to contain at least one element. -/// The group needs to be of the same field name and type. -fn merge_same_field_group( - current_group: &mut Vec, -) -> ListFieldsEntryResponse { - // Make sure all fields have the same name and type in current_group - assert!(!current_group.is_empty()); - assert!( - current_group - .windows(2) - .all(|window| window[0].field_name == window[1].field_name - && window[0].field_type == window[1].field_type) - ); - - if current_group.len() == 1 { - return current_group - .pop() - .expect("`current_group` should not be empty"); - } - let metadata = current_group - .last() - .expect("`current_group` should not be empty"); - let searchable = current_group.iter().any(|entry| entry.searchable); - let aggregatable = current_group.iter().any(|entry| entry.aggregatable); - let field_name = metadata.field_name.to_string(); - let field_type = metadata.field_type; - let mut non_searchable_index_ids = if searchable { - // We need to combine the non_searchable_index_ids + index_ids where searchable is set to - // false (as they are all non_searchable) - current_group - .iter() - .flat_map(|entry| { - if !entry.searchable { - entry.index_ids.iter().cloned() - } else { - entry.non_searchable_index_ids.iter().cloned() - } - }) - .collect() - } else { - // Not searchable => no need to list all the indices - Vec::new() - }; - non_searchable_index_ids.sort_unstable(); - non_searchable_index_ids.dedup(); - - let mut non_aggregatable_index_ids = if aggregatable { - // We need to combine the non_aggregatable_index_ids + index_ids where aggregatable is set - // to false (as they are all non_aggregatable) - current_group - .iter() - .flat_map(|entry| { - if !entry.aggregatable { - entry.index_ids.iter().cloned() - } else { - entry.non_aggregatable_index_ids.iter().cloned() - } - }) - .collect() - } else { - // Not aggregatable => no need to list all the indices - Vec::new() - }; - non_aggregatable_index_ids.sort_unstable(); - non_aggregatable_index_ids.dedup(); - let mut index_ids: Vec = current_group - .drain(..) - .flat_map(|entry| entry.index_ids.into_iter()) - .collect(); - index_ids.sort_unstable(); - index_ids.dedup(); - - ListFieldsEntryResponse { - field_name, - field_type, - searchable, - aggregatable, - non_searchable_index_ids, - non_aggregatable_index_ids, - index_ids, - } -} - -/// Merge iterators of ListFieldsEntryResponse into a `Vec`. -/// -/// The iterators need to be sorted by (field_name, fieldtype) -fn merge_leaf_list_fields( - iterators: Vec>, -) -> crate::Result> { - let merged = iterators - .into_iter() - .kmerge_by(|a, b| (&a.field_name, a.field_type) <= (&b.field_name, b.field_type)); - let mut responses = Vec::new(); - - let mut current_group: Vec = Vec::new(); - // Build ListFieldsEntryResponse from current group - let flush_group = |responses: &mut Vec<_>, current_group: &mut Vec| { - let entry = merge_same_field_group(current_group); - responses.push(entry); - current_group.clear(); - }; - - for entry in merged { - if let Some(last) = current_group.last() - && (last.field_name != entry.field_name || last.field_type != entry.field_type) - { - flush_group(&mut responses, &mut current_group); - } - if responses.len() >= *FIELD_LIST_SIZE_LIMIT { - return Err(SearchError::Internal(format!( - "list fields response exceeded {} fields", - *FIELD_LIST_SIZE_LIMIT - ))); - } - current_group.push(entry); - } - if !current_group.is_empty() { - flush_group(&mut responses, &mut current_group); - } - - Ok(responses) -} - -// Returns true if any of the patterns match the field name. -fn matches_any_pattern(field_name: &str, field_patterns: &[FieldPattern]) -> bool { - field_patterns - .iter() - .any(|pattern| pattern.matches(field_name)) -} - -enum FieldPattern { - Match { field: String }, - Wildcard { prefix: String, suffix: String }, -} - -impl FromStr for FieldPattern { - type Err = crate::SearchError; - - fn from_str(field_pattern: &str) -> crate::Result { - match field_pattern.find('*') { - None => Ok(FieldPattern::Match { - field: field_pattern.to_string(), - }), - Some(pos) => { - let prefix = field_pattern[..pos].to_string(); - let suffix = field_pattern[pos + 1..].to_string(); - if suffix.contains("*") { - return Err(crate::SearchError::InvalidArgument(format!( - "invalid field pattern `{field_pattern}`: we only support one wildcard" - ))); - } - Ok(FieldPattern::Wildcard { prefix, suffix }) - } - } - } -} - -impl FieldPattern { - pub fn matches(&self, field_name: &str) -> bool { - match self { - FieldPattern::Match { field } => field == field_name, - FieldPattern::Wildcard { prefix, suffix } => { - field_name.starts_with(prefix) && field_name.ends_with(suffix) - } - } - } -} - -/// `leaf` step of list fields. -/// -/// Returns field metadata from the assigned splits. -pub async fn leaf_list_fields( - index_id: IndexId, - index_storage: Arc, - searcher_context: &SearcherContext, - split_ids: &[SplitIdAndFooterOffsets], - field_patterns_str: &[String], -) -> crate::Result { - let field_patterns: Vec = field_patterns_str - .iter() - .map(|pattern_str| FieldPattern::from_str(pattern_str)) - .collect::>()?; - - // If no splits, return empty response - if split_ids.is_empty() { - return Ok(ListFieldsResponse { fields: Vec::new() }); - } - - // Get fields from all splits - let single_split_list_fields_futures: Vec<_> = split_ids - .iter() - .map(|split_id| { - get_fields_from_split( - searcher_context, - index_id.to_string(), - split_id, - index_storage.clone(), - ) - }) - .collect(); - - let mut single_split_list_fields_vec: Vec> = - future::try_join_all(single_split_list_fields_futures).await?; - - let fields = search_thread_pool() - .run_cpu_intensive(move || { - for single_split_list_fields in &mut single_split_list_fields_vec { - // This contract is enforced on a different node, etc. so we defensively check that - // the fields are sorted and deduplicated. - if !single_split_list_fields.is_sorted_by(|left, right| { - // Checking on less ensure that this is both sorted AND that there are no - // duplicates - field_order(left, right) == std::cmp::Ordering::Less - }) { - rate_limited_warn!( - limit_per_min = 1, - "contract breach: fields returned by a leaf are not strictly sorted! \ - please report" - ); - make_sorted_and_dedup(single_split_list_fields); - } - } - - let filtered_list_fields_sorted_iters: Vec<_> = single_split_list_fields_vec - .into_iter() - .map(|list_fields_sorted| { - list_fields_sorted.into_iter().filter(|field| { - if field_patterns.is_empty() { - true - } else { - matches_any_pattern(&field.field_name, &field_patterns) - } - }) - }) - .collect(); - merge_leaf_list_fields(filtered_list_fields_sorted_iters) - }) - .await - .context("failed to merge single split list fields")??; - Ok(ListFieldsResponse { fields }) -} - -/// Index metas needed for executing a leaf list fields request. -#[derive(Clone, Debug)] -pub struct IndexMetasForLeafSearch { - /// Index id. - pub index_id: IndexId, - /// Index URI. - pub index_uri: Uri, -} - -/// Performs a distributed list fields request. -/// 1. Sends leaf requests over gRPC to multiple leaf nodes. -/// 2. Merges the search results. -/// 3. Builds the response and returns. -pub async fn root_list_fields( - list_fields_req: ListFieldsRequest, - cluster_client: &ClusterClient, - mut metastore: MetastoreServiceClient, -) -> crate::Result { - let indexes_metadata = - resolve_index_patterns(&list_fields_req.index_id_patterns[..], &mut metastore).await?; - // The request contains a wildcard, but couldn't find any index. - if indexes_metadata.is_empty() { - return Ok(ListFieldsResponse { fields: Vec::new() }); - } - - // Build index metadata map and extract timestamp field for time range refinement - let mut index_uid_to_index_meta: HashMap = HashMap::new(); - let mut index_uids: Vec = Vec::new(); - let mut timestamp_field_opt: Option = None; - - for index_metadata in indexes_metadata { - // Extract timestamp field for time range refinement (use first index's field) - if timestamp_field_opt.is_none() - && list_fields_req.query_ast.is_some() - && let Ok(doc_mapper) = build_doc_mapper( - &index_metadata.index_config.doc_mapping, - &index_metadata.index_config.search_settings, - ) - { - timestamp_field_opt = doc_mapper.timestamp_field_name().map(|s| s.to_string()); - } - - let index_metadata_for_leaf_search = IndexMetasForLeafSearch { - index_uri: index_metadata.index_uri().clone(), - index_id: index_metadata.index_config.index_id.to_string(), - }; - - index_uids.push(index_metadata.index_uid.clone()); - index_uid_to_index_meta.insert( - index_metadata.index_uid.clone(), - index_metadata_for_leaf_search, - ); - } - - // Extract tags and refine time range from query_ast for split pruning - let mut start_timestamp = list_fields_req.start_timestamp; - let mut end_timestamp = list_fields_req.end_timestamp; - let tags_filter_opt = if let Some(ref query_ast_json) = list_fields_req.query_ast { - let query_ast: QueryAst = serde_json::from_str(query_ast_json) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - - // Refine time range from query AST if timestamp field is available - if let Some(ref timestamp_field) = timestamp_field_opt { - crate::root::refine_start_end_timestamp_from_ast( - &query_ast, - timestamp_field, - &mut start_timestamp, - &mut end_timestamp, - ); - } - - extract_tags_from_query(query_ast) - } else { - None - }; - - let split_metadatas: Vec = list_relevant_splits( - index_uids, - start_timestamp, - end_timestamp, - tags_filter_opt, - &mut metastore, - ) - .await?; - - // Build requests for each index id - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); - let assigned_leaf_search_jobs = cluster_client - .search_job_placer - .assign_jobs(jobs, &HashSet::default()) - .await?; - let mut leaf_request_tasks = Vec::new(); - // For each node, forward to a node with an affinity for that index id. - for (client, client_jobs) in assigned_leaf_search_jobs { - let leaf_requests = - jobs_to_leaf_requests(&list_fields_req, &index_uid_to_index_meta, client_jobs)?; - for leaf_request in leaf_requests { - leaf_request_tasks.push(cluster_client.leaf_list_fields(leaf_request, client.clone())); - } - } - let leaf_list_fields_protos: Vec = try_join_all(leaf_request_tasks).await?; - let fields = search_thread_pool() - .run_cpu_intensive(move || { - let leaf_list_fields = leaf_list_fields_protos - .into_iter() - .map(|leaf_list_fields_proto| leaf_list_fields_proto.fields.into_iter()) - .collect(); - merge_leaf_list_fields(leaf_list_fields) - }) - .await - .context("failed to merge leaf list fields responses")??; - - Ok(ListFieldsResponse { fields }) -} - -/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. -pub fn jobs_to_leaf_requests( - request: &ListFieldsRequest, - index_uid_to_id: &HashMap, - jobs: Vec, -) -> crate::Result> { - let search_request_for_leaf = request.clone(); - let mut leaf_search_requests = Vec::new(); - // Group jobs by index uid. - group_jobs_by_index_id(jobs, |job_group| { - let index_uid = &job_group[0].index_uid; - let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| { - SearchError::Internal(format!( - "received list fields job for an unknown index {index_uid}. it should never happen" - )) - })?; - - let leaf_search_request = LeafListFieldsRequest { - index_id: index_meta.index_id.to_string(), - index_uri: index_meta.index_uri.to_string(), - fields: search_request_for_leaf.fields.clone(), - split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), - }; - leaf_search_requests.push(leaf_search_request); - Ok(()) - })?; - - Ok(leaf_search_requests) -} - -#[cfg(test)] -mod tests { - use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse}; - - use super::*; - - #[test] - fn merge_leaf_list_fields_identical_test() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - assert_eq!(resp, vec![entry1]); - } - #[test] - fn merge_leaf_list_fields_different_test() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field2".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - assert_eq!(resp, vec![entry1, entry2]); - } - #[test] - fn merge_leaf_list_fields_non_searchable_test() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: false, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index2".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - let expected = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: vec!["index2".to_string()], - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string(), "index2".to_string()], - }; - assert_eq!(resp, vec![expected]); - } - #[test] - fn merge_leaf_list_fields_non_aggregatable_test() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: false, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index2".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - let expected = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: vec!["index2".to_string()], - index_ids: vec!["index1".to_string(), "index2".to_string()], - }; - assert_eq!(resp, vec![expected]); - } - #[test] - fn merge_leaf_list_fields_mixed_types1() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry3 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::U64 as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone(), entry2.clone()].into_iter(), - vec![entry3.clone()].into_iter(), - ]) - .unwrap(); - assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); - } - #[test] - fn merge_leaf_list_fields_mixed_types2() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry3 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::U64 as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone(), entry3.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); - } - #[test] - fn merge_leaf_list_fields_multiple_field_names() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let entry3 = ListFieldsEntryResponse { - field_name: "field2".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index1".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone(), entry3.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); - } - #[test] - fn merge_leaf_list_fields_non_aggregatable_list_test() { - let entry1 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: vec!["index1".to_string()], - non_aggregatable_index_ids: Vec::new(), - index_ids: vec![ - "index1".to_string(), - "index2".to_string(), - "index3".to_string(), - ], - }; - let entry2 = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: false, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index4".to_string()], - }; - let resp = merge_leaf_list_fields(vec![ - vec![entry1.clone()].into_iter(), - vec![entry2.clone()].into_iter(), - ]) - .unwrap(); - let expected = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: true, - aggregatable: true, - non_searchable_index_ids: vec!["index1".to_string(), "index4".to_string()], - non_aggregatable_index_ids: Vec::new(), - index_ids: vec![ - "index1".to_string(), - "index2".to_string(), - "index3".to_string(), - "index4".to_string(), - ], - }; - assert_eq!(resp, vec![expected]); - } - - #[test] - fn test_field_pattern() { - let prefix_pattern = FieldPattern::from_str("toto*").unwrap(); - assert!(!prefix_pattern.matches("")); - assert!(!prefix_pattern.matches("tot3")); - assert!(!prefix_pattern.matches("atoto")); - assert!(prefix_pattern.matches("toto")); - assert!(prefix_pattern.matches("totowhatever")); - - let suffix_pattern = FieldPattern::from_str("*toto").unwrap(); - assert!(!suffix_pattern.matches("")); - assert!(!suffix_pattern.matches("3tot")); - assert!(!suffix_pattern.matches("totoa")); - assert!(suffix_pattern.matches("toto")); - assert!(suffix_pattern.matches("whatevertoto")); - - let inner_pattern = FieldPattern::from_str("to*ti").unwrap(); - assert!(!inner_pattern.matches("")); - assert!(!inner_pattern.matches("tot")); - assert!(!inner_pattern.matches("totia")); - assert!(!inner_pattern.matches("atoti")); - assert!(inner_pattern.matches("toti")); - assert!(!inner_pattern.matches("tito")); - assert!(inner_pattern.matches("towhateverti")); - - assert!(FieldPattern::from_str("to**").is_err()); - } -} diff --git a/quickwit/quickwit-search/src/list_fields/cache.rs b/quickwit/quickwit-search/src/list_fields/cache.rs new file mode 100644 index 00000000000..ee25da36c5b --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields/cache.rs @@ -0,0 +1,40 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_config::CacheConfig; +use quickwit_proto::types::SplitId; +use quickwit_storage::{MemorySizedCache, OwnedBytes}; + +pub struct ListFieldsCache { + cache: MemorySizedCache, +} + +impl ListFieldsCache { + pub fn new(config: &CacheConfig) -> ListFieldsCache { + Self { + cache: MemorySizedCache::from_config( + config, + &quickwit_storage::STORAGE_METRICS.partial_request_cache, + ), + } + } + + pub fn get(&self, split_id: &SplitId) -> Option { + self.cache.get(split_id) + } + + pub fn put(&self, split_id: SplitId, serialized_split_fields: OwnedBytes) { + self.cache.put(split_id, serialized_split_fields); + } +} diff --git a/quickwit/quickwit-search/src/list_fields/leaf.rs b/quickwit/quickwit-search/src/list_fields/leaf.rs new file mode 100644 index 00000000000..2781b5bfc04 --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields/leaf.rs @@ -0,0 +1,373 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context; +use futures::stream::{self, StreamExt, TryStreamExt}; +use quickwit_common::shared_consts::{FIELD_PRESENCE_FIELD_NAME, SPLIT_FIELDS_FILE_NAME}; +use quickwit_proto::search::{ + ListFieldsEntry, ListFieldsMetadata, ListFieldsResponse, SplitIdAndFooterOffsets, +}; +use quickwit_proto::types::{IndexId, SplitId}; +use quickwit_storage::{OwnedBytes, Storage}; +use tracing::{Span, instrument}; + +use crate::leaf::open_split_bundle; +use crate::list_fields::patterns::FieldPatterns; +use crate::list_fields::{merge_entries, sort_and_dedup}; +use crate::search_thread_pool; +use crate::service::SearcherContext; + +const DYNAMIC_FIELD_PREFIX: &str = "_dynamic."; + +/// Cap on the number of per-split fields-metadata downloads in flight at once within a single +/// `leaf_list_fields` call. +const MAX_CONCURRENT_DOWNLOADS: usize = 500; + +/// `leaf` step of list fields. +/// +/// Returns field metadata from the assigned splits. +#[instrument(skip_all, fields(index_id = %index_id, num_splits = split_footers.len()))] +pub async fn leaf_list_fields( + index_id: IndexId, + field_patterns_strs: &[String], + split_footers: Vec, + searcher_ctx: Arc, + storage: Arc, +) -> crate::Result { + if split_footers.is_empty() { + return Ok(ListFieldsResponse::default()); + } + let field_patterns = FieldPatterns::from_strs(field_patterns_strs)?; + + let all_entries: Vec> = get_and_process_fields_metadata( + &index_id, + &field_patterns, + split_footers, + searcher_ctx, + storage, + ) + .await?; + + let merged_entries: Vec = merge_fields_metadata(all_entries).await?; + + let response = ListFieldsResponse { + entries: merged_entries, + }; + Ok(response) +} + +/// Two-stage pipeline: download blobs with bounded I/O concurrency, then hand each one off to the +/// CPU pool as soon as it's ready. The CPU stage is left unbounded here because `run_cpu_intensive` +/// already serializes work through the search thread pool. +async fn get_and_process_fields_metadata( + index_id: &str, + field_patterns: &FieldPatterns, + split_footers: Vec, + searcher_ctx: Arc, + storage: Arc, +) -> anyhow::Result>> { + stream::iter(split_footers) + .map(|split_footer| { + let searcher_ctx = searcher_ctx.clone(); + let storage = storage.clone(); + async move { + let serialized_entries: OwnedBytes = + get_fields_metadata(&split_footer, &searcher_ctx, storage).await?; + anyhow::Ok((split_footer.split_id, serialized_entries)) + } + }) + .buffer_unordered(MAX_CONCURRENT_DOWNLOADS) + .map_ok(|(split_id, serialized_entries)| { + process_fields_metadata( + serialized_entries, + index_id.to_string(), + split_id, + field_patterns.clone(), + ) + }) + // `usize::MAX` is fine here: the upstream `buffer_unordered` already bounds the number of + // items that can have completed and be waiting for CPU work to `MAX_CONCURRENT_DOWNLOADS`, + // and `run_cpu_intensive` inside `process_fields_metadata` queues onto the search thread + // pool, so the effective CPU parallelism is the pool size — not this value. + .try_buffer_unordered(usize::MAX) + .try_collect() + .await +} + +/// Gets the raw serialized fields metadata blob for a single split — either from the cache or, +/// on miss, from storage (populating the cache as a side effect). The cache stores the bytes +/// verbatim (proto + zstd, exactly as fetched), so there is no encode/decode round-trip at the +/// cache boundary. +#[instrument(skip_all, fields(split_id = %split_footer.split_id))] +async fn get_fields_metadata( + split_footer: &SplitIdAndFooterOffsets, + searcher_ctx: &SearcherContext, + storage: Arc, +) -> anyhow::Result { + if let Some(serialized_entries) = searcher_ctx.list_fields_cache.get(&split_footer.split_id) { + return Ok(serialized_entries); + } + let serialized_entries = fetch_fields_metadata(searcher_ctx, storage, split_footer).await?; + + searcher_ctx + .list_fields_cache + .put(split_footer.split_id.clone(), serialized_entries.clone()); + + Ok(serialized_entries) +} + +/// Downloads the serialized fields metadata blob for one split. +#[instrument(skip_all, fields(split_id = %split_footer.split_id))] +async fn fetch_fields_metadata( + searcher_ctx: &SearcherContext, + storage: Arc, + split_footer: &SplitIdAndFooterOffsets, +) -> anyhow::Result { + let (_, split_bundle) = open_split_bundle(searcher_ctx, storage, split_footer).await?; + let serialized_entries = split_bundle + .get_all(Path::new(SPLIT_FIELDS_FILE_NAME)) + .await + .with_context(|| { + format!( + "failed to fetch fields metadata for split `{}`", + split_footer.split_id + ) + })?; + Ok(serialized_entries) +} + +/// Dispatches deserialization + filtering onto the CPU-intensive thread pool. With one task per +/// split fanning in here, the pool's workers process splits in parallel. +async fn process_fields_metadata( + serialized_entries: OwnedBytes, + index_id: IndexId, + split_id: SplitId, + field_patterns: FieldPatterns, +) -> anyhow::Result> { + let parent_span = Span::current(); + search_thread_pool() + .run_cpu_intensive(move || { + parent_span.in_scope(|| { + let entries: Vec = + deserialize_fields_metadata(serialized_entries, &index_id, &split_id)?; + let filtered_entries = filter_fields_metadata(entries, &field_patterns); + Ok(filtered_entries) + }) + }) + .await + .context("failed to deserialize and filter fields metadata")? +} + +/// Deserialize the fields metadata blob, strip the field presence entry, +/// rewrite dynamic field names, and enforce the strictly-sorted-by-(name, type) and deduplication +/// invariants. +#[instrument(skip_all, fields(split_id))] +fn deserialize_fields_metadata( + serialized_entries: OwnedBytes, + index_id: &str, + split_id: &str, +) -> anyhow::Result> { + let mut entries = ListFieldsMetadata::deserialize(serialized_entries) + .with_context(|| format!("failed to deserialize fields metadata for split `{split_id}`"))? + .entries; + + entries.retain_mut(|list_field_entry| { + if list_field_entry.field_name == FIELD_PRESENCE_FIELD_NAME { + return false; + } + list_field_entry.index_ids = vec![index_id.to_string()]; + + if list_field_entry + .field_name + .starts_with(DYNAMIC_FIELD_PREFIX) + { + list_field_entry + .field_name + .drain(..DYNAMIC_FIELD_PREFIX.len()); + } + true + }); + // Removing the dynamic prefix may have broken the original ordering, so we re-sort and + // defensively dedup. + sort_and_dedup(&mut entries); + Ok(entries) +} + +/// Filters the list of fields against the given patterns. Order is preserved, so the +/// strictly-sorted-by-(name, type) invariant is maintained. Patterns match against the +/// user-visible field names (i.e. after `DYNAMIC_FIELD_PREFIX` has been stripped). +#[instrument(skip_all)] +fn filter_fields_metadata( + list_fields_entries: Vec, + field_patterns: &FieldPatterns, +) -> Vec { + if field_patterns.is_empty() { + return list_fields_entries; + } + list_fields_entries + .into_iter() + .filter(|entry| field_patterns.matches_any(&entry.field_name)) + .collect() +} + +#[instrument(skip_all, fields(num_splits = all_entries.len()))] +async fn merge_fields_metadata( + all_entries: Vec>, +) -> crate::Result> { + let parent_span = Span::current(); + search_thread_pool() + .run_cpu_intensive(move || parent_span.in_scope(|| merge_entries(all_entries))) + .await + .context("failed to merge single split list fields")? +} + +#[cfg(test)] +mod tests { + use quickwit_proto::search::ListFieldsType; + + use super::*; + + fn entry(field_name: &str) -> ListFieldsEntry { + entry_typed(field_name, ListFieldsType::Str) + } + + fn entry_typed(field_name: &str, field_type: ListFieldsType) -> ListFieldsEntry { + ListFieldsEntry { + field_name: field_name.to_string(), + field_type: field_type as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: Vec::new(), + } + } + + fn serialize(entries: Vec) -> OwnedBytes { + OwnedBytes::new(ListFieldsMetadata { entries }.serialize()) + } + + #[test] + fn deserialize_fields_metadata_strips_field_presence() { + let bytes = serialize(vec![entry(FIELD_PRESENCE_FIELD_NAME), entry("user.name")]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].field_name, "user.name"); + } + + #[test] + fn deserialize_fields_metadata_removes_dynamic_prefix() { + // The serialized blob is sorted by (name, type); `_dynamic.foo` < `user.name`. + let bytes = serialize(vec![entry("user.name"), entry("_dynamic.foo")]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + let field_names: Vec<&str> = fields + .iter() + .map(|field| field.field_name.as_str()) + .collect(); + assert_eq!(field_names, ["foo", "user.name"]); + } + + #[test] + fn deserialize_fields_metadata_inserts_index_id() { + let bytes = serialize(vec![entry("user.name")]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + assert_eq!(fields[0].index_ids, ["test-index"]); + } + + #[test] + fn deserialize_fields_metadata_dedups_after_prefix_rewrite() { + // Both `_dynamic.foo` and `foo` collapse to `foo` after stripping the prefix. + let bytes = serialize(vec![entry("_dynamic.foo"), entry("foo")]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].field_name, "foo"); + } + + #[test] + fn deserialize_fields_metadata_dedups_by_name_and_type() { + // Same field name with different types must NOT be deduped; the sort key is (name, type). + let bytes = serialize(vec![ + entry_typed("status", ListFieldsType::U64), + entry_typed("status", ListFieldsType::Bool), + entry("user.name"), + entry_typed("status", ListFieldsType::Str), + ]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + let field_pairs: Vec<(&str, i32)> = fields + .iter() + .map(|field| (field.field_name.as_str(), field.field_type)) + .collect(); + assert_eq!( + field_pairs, + [ + ("status", ListFieldsType::Str as i32), + ("status", ListFieldsType::U64 as i32), + ("status", ListFieldsType::Bool as i32), + ("user.name", ListFieldsType::Str as i32), + ] + ); + } + + #[test] + fn deserialize_fields_metadata_sorts_by_name_then_type() { + // `_dynamic.b` collapses to `b`, so after the rewrite we should see, in order: + // (`a`, Str), (`b`, Str), (`b`, U64). Build the input out of name-sort order to + // make sure the post-rewrite re-sort is actually doing the work. + let bytes = serialize(vec![ + entry_typed("_dynamic.b", ListFieldsType::U64), + entry_typed("_dynamic.b", ListFieldsType::Str), + entry_typed("a", ListFieldsType::Str), + ]); + let fields = deserialize_fields_metadata(bytes, "test-index", "test-split").unwrap(); + let field_pairs: Vec<(&str, i32)> = fields + .iter() + .map(|field| (field.field_name.as_str(), field.field_type)) + .collect(); + assert_eq!( + field_pairs, + [ + ("a", ListFieldsType::Str as i32), + ("b", ListFieldsType::Str as i32), + ("b", ListFieldsType::U64 as i32), + ] + ); + } + + #[test] + fn filter_fields_metadata_returns_all_when_empty_patterns() { + let fields = vec![entry("a"), entry("b")]; + let patterns = FieldPatterns::from_strs(&[]).unwrap(); + let filtered = filter_fields_metadata(fields.clone(), &patterns); + assert_eq!(filtered, fields); + } + + #[test] + fn filter_fields_metadata_keeps_only_matching_fields() { + let fields = vec![ + entry("service.name"), + entry("user.name"), + entry("service.id"), + ]; + let patterns = FieldPatterns::from_strs(&["service.*".to_string()]).unwrap(); + let filtered = filter_fields_metadata(fields, &patterns); + let field_names: Vec<&str> = filtered + .iter() + .map(|field| field.field_name.as_str()) + .collect(); + assert_eq!(field_names, ["service.name", "service.id"]); + } +} diff --git a/quickwit/quickwit-search/src/list_fields/mod.rs b/quickwit/quickwit-search/src/list_fields/mod.rs new file mode 100644 index 00000000000..9cc81957c95 --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields/mod.rs @@ -0,0 +1,453 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod cache; +mod leaf; +mod patterns; +mod root; + +use std::cmp::Ordering; +use std::sync::LazyLock; + +pub use cache::ListFieldsCache; +use itertools::Itertools; +use quickwit_common::rate_limited_warn; +use quickwit_proto::search::ListFieldsEntry; +use tracing::instrument; + +use crate::SearchError; +pub use crate::list_fields::leaf::leaf_list_fields; +pub use crate::list_fields::root::root_list_fields; + +/// QW_FIELD_LIST_SIZE_LIMIT defines a hard limit on the number of fields that +/// can be returned (error otherwise). +/// +/// Having many fields can happen when a user is creating fields dynamically in +/// a JSON type with random field names. This leads to huge memory consumption +/// when building the response. This is a workaround until a way is found to +/// prune the long tail of rare fields. +static FIELD_LIST_SIZE_LIMIT: LazyLock = + LazyLock::new(|| quickwit_common::get_from_env("QW_FIELD_LIST_SIZE_LIMIT", 100_000, false)); + +// Sorts and deduplicates the list of fields. +// +// If somehow we end up with duplicate fields, only the first one is kept, +// and we log a warning. +#[instrument(skip_all)] +fn sort_and_dedup(entries: &mut Vec) { + entries.sort_unstable_by(|left, right| left.cmp_by_name_and_type(right)); + + // We defensively make sure there are no duplicates here. + entries.dedup_by(|left, right| { + if left.cmp_by_name_and_type(right) == std::cmp::Ordering::Equal { + rate_limited_warn!( + limit_per_min = 1, + field_name = %left.field_name, + field_type = %left.field_type, + "found duplicate fields, please report" + ); + true + } else { + false + } + }); +} + +fn merge_entries(entry_groups: Vec>) -> crate::Result> { + let merged_entries = entry_groups + .into_iter() + .kmerge_by(|left, right| left.cmp_by_name_and_type(right) == Ordering::Less); + let mut entries = Vec::new(); + + let mut current_group: Vec = Vec::new(); + // Build ListFieldsEntry from current group + let flush_group = |responses: &mut Vec<_>, current_group: &mut Vec| { + let entry = merge_same_entry_group(current_group); + responses.push(entry); + current_group.clear(); + }; + for entry in merged_entries { + if let Some(last) = current_group.last() + && (last.field_name != entry.field_name || last.field_type != entry.field_type) + { + flush_group(&mut entries, &mut current_group); + } + if entries.len() >= *FIELD_LIST_SIZE_LIMIT { + return Err(SearchError::Internal(format!( + "list fields response exceeded {} fields", + *FIELD_LIST_SIZE_LIMIT + ))); + } + current_group.push(entry); + } + if !current_group.is_empty() { + flush_group(&mut entries, &mut current_group); + } + Ok(entries) +} + +/// `current_group` needs to contain at least one element. +/// The group needs to be of the same field name and type. +fn merge_same_entry_group(current_group: &mut Vec) -> ListFieldsEntry { + // Make sure all fields have the same name and type in current_group + assert!(!current_group.is_empty()); + assert!( + current_group + .windows(2) + .all(|window| window[0].field_name == window[1].field_name + && window[0].field_type == window[1].field_type) + ); + if current_group.len() == 1 { + return current_group + .pop() + .expect("`current_group` should not be empty"); + } + let metadata = current_group + .last() + .expect("`current_group` should not be empty"); + let searchable = current_group.iter().any(|entry| entry.searchable); + let aggregatable = current_group.iter().any(|entry| entry.aggregatable); + let field_name = metadata.field_name.to_string(); + let field_type = metadata.field_type; + let mut non_searchable_index_ids = if searchable { + // We need to combine the non_searchable_index_ids + index_ids where searchable is set to + // false (as they are all non_searchable) + current_group + .iter() + .flat_map(|entry| { + if !entry.searchable { + entry.index_ids.iter().cloned() + } else { + entry.non_searchable_index_ids.iter().cloned() + } + }) + .collect() + } else { + // Not searchable => no need to list all the indices + Vec::new() + }; + non_searchable_index_ids.sort_unstable(); + non_searchable_index_ids.dedup(); + + let mut non_aggregatable_index_ids = if aggregatable { + // We need to combine the non_aggregatable_index_ids + index_ids where aggregatable is set + // to false (as they are all non_aggregatable) + current_group + .iter() + .flat_map(|entry| { + if !entry.aggregatable { + entry.index_ids.iter().cloned() + } else { + entry.non_aggregatable_index_ids.iter().cloned() + } + }) + .collect() + } else { + // Not aggregatable => no need to list all the indices + Vec::new() + }; + non_aggregatable_index_ids.sort_unstable(); + non_aggregatable_index_ids.dedup(); + + let index_ids: Vec = current_group + .drain(..) + .flat_map(|entry| entry.index_ids.into_iter()) + .sorted_unstable() + .dedup() + .collect(); + + ListFieldsEntry { + field_name, + field_type, + searchable, + aggregatable, + non_searchable_index_ids, + non_aggregatable_index_ids, + index_ids, + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::search::{ListFieldsEntry, ListFieldsType}; + + use super::*; + + #[test] + fn merge_leaf_list_fields_identical_test() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_entries(vec![vec![entry1.clone()], vec![entry2.clone()]]).unwrap(); + assert_eq!(resp, vec![entry1]); + } + + #[test] + fn merge_leaf_list_fields_different_test() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field2".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_entries(vec![vec![entry1.clone()], vec![entry2.clone()]]).unwrap(); + assert_eq!(resp, vec![entry1, entry2]); + } + + #[test] + fn merge_leaf_list_fields_non_searchable_test() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_entries(vec![vec![entry1.clone()], vec![entry2.clone()]]).unwrap(); + let expected = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index2".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + + #[test] + fn merge_leaf_list_fields_non_aggregatable_test() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: false, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_entries(vec![vec![entry1.clone()], vec![entry2.clone()]]).unwrap(); + let expected = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: vec!["index2".to_string()], + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + + #[test] + fn merge_leaf_list_fields_mixed_types1() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::U64 as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_entries(vec![ + vec![entry1.clone(), entry2.clone()], + vec![entry3.clone()], + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + + #[test] + fn merge_leaf_list_fields_mixed_types2() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::U64 as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_entries(vec![ + vec![entry1.clone(), entry3.clone()], + vec![entry2.clone()], + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + + #[test] + fn merge_leaf_list_fields_multiple_field_names() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntry { + field_name: "field2".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_entries(vec![ + vec![entry1.clone(), entry3.clone()], + vec![entry2.clone()], + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + + #[test] + fn merge_leaf_list_fields_non_aggregatable_list_test() { + let entry1 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + ], + }; + let entry2 = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index4".to_string()], + }; + let resp = merge_entries(vec![vec![entry1.clone()], vec![entry2.clone()]]).unwrap(); + let expected = ListFieldsEntry { + field_name: "field1".to_string(), + field_type: ListFieldsType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string(), "index4".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + "index4".to_string(), + ], + }; + assert_eq!(resp, vec![expected]); + } +} diff --git a/quickwit/quickwit-search/src/list_fields/patterns.rs b/quickwit/quickwit-search/src/list_fields/patterns.rs new file mode 100644 index 00000000000..75b668355a9 --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields/patterns.rs @@ -0,0 +1,135 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; +use std::sync::Arc; + +/// A parsed, cheaply-cloneable set of field-name patterns. An empty set matches every field. +#[derive(Clone)] +pub struct FieldPatterns { + patterns: Arc<[FieldPattern]>, +} + +impl FieldPatterns { + /// Parses each input string into a [`FieldPattern`]. Returns an error on invalid input + /// (e.g. more than one wildcard). + pub fn from_strs(field_pattern_strs: &[String]) -> crate::Result { + let patterns: Arc<[FieldPattern]> = field_pattern_strs + .iter() + .map(|pattern_str| FieldPattern::from_str(pattern_str)) + .collect::>>()? + .into(); + Ok(Self { patterns }) + } + + pub fn is_empty(&self) -> bool { + self.patterns.is_empty() + } + + /// Returns true if any of the patterns match the field name. + pub fn matches_any(&self, field_name: &str) -> bool { + self.patterns + .iter() + .any(|pattern| pattern.matches(field_name)) + } +} + +enum FieldPattern { + Match { field: String }, + Wildcard { prefix: String, suffix: String }, +} + +impl FromStr for FieldPattern { + type Err = crate::SearchError; + + fn from_str(field_pattern: &str) -> crate::Result { + match field_pattern.find('*') { + None => Ok(FieldPattern::Match { + field: field_pattern.to_string(), + }), + Some(pos) => { + let prefix = field_pattern[..pos].to_string(); + let suffix = field_pattern[pos + 1..].to_string(); + + if suffix.contains("*") { + return Err(crate::SearchError::InvalidArgument(format!( + "invalid field pattern `{field_pattern}`: only one wildcard is supported" + ))); + } + Ok(FieldPattern::Wildcard { prefix, suffix }) + } + } + } +} + +impl FieldPattern { + fn matches(&self, field_name: &str) -> bool { + match self { + FieldPattern::Match { field } => field == field_name, + FieldPattern::Wildcard { prefix, suffix } => { + field_name.starts_with(prefix) && field_name.ends_with(suffix) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_field_pattern() { + let prefix_pattern = FieldPattern::from_str("toto*").unwrap(); + assert!(!prefix_pattern.matches("")); + assert!(!prefix_pattern.matches("tot3")); + assert!(!prefix_pattern.matches("atoto")); + assert!(prefix_pattern.matches("toto")); + assert!(prefix_pattern.matches("totowhatever")); + + let suffix_pattern = FieldPattern::from_str("*toto").unwrap(); + assert!(!suffix_pattern.matches("")); + assert!(!suffix_pattern.matches("3tot")); + assert!(!suffix_pattern.matches("totoa")); + assert!(suffix_pattern.matches("toto")); + assert!(suffix_pattern.matches("whatevertoto")); + + let inner_pattern = FieldPattern::from_str("to*ti").unwrap(); + assert!(!inner_pattern.matches("")); + assert!(!inner_pattern.matches("tot")); + assert!(!inner_pattern.matches("totia")); + assert!(!inner_pattern.matches("atoti")); + assert!(inner_pattern.matches("toti")); + assert!(!inner_pattern.matches("tito")); + assert!(inner_pattern.matches("towhateverti")); + + assert!(FieldPattern::from_str("to**").is_err()); + } + + #[test] + fn test_field_patterns_empty_matches_nothing_via_explicit_check() { + let patterns = FieldPatterns::from_strs(&[]).unwrap(); + assert!(patterns.is_empty()); + assert!(!patterns.matches_any("anything")); + } + + #[test] + fn test_field_patterns_matches_any() { + let patterns = + FieldPatterns::from_strs(&["service.*".to_string(), "exact".to_string()]).unwrap(); + assert!(!patterns.is_empty()); + assert!(patterns.matches_any("service.name")); + assert!(patterns.matches_any("exact")); + assert!(!patterns.matches_any("other")); + } +} diff --git a/quickwit/quickwit-search/src/list_fields/root.rs b/quickwit/quickwit-search/src/list_fields/root.rs new file mode 100644 index 00000000000..4a6b44c5db0 --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields/root.rs @@ -0,0 +1,208 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; + +use anyhow::Context; +use futures::future::try_join_all; +use quickwit_common::rate_limited_warn; +use quickwit_common::uri::Uri; +use quickwit_config::build_doc_mapper; +use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; +use quickwit_metastore::SplitMetadata; +use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::search::{ + LeafListFieldsRequest, ListFieldsEntry, ListFieldsRequest, ListFieldsResponse, +}; +use quickwit_proto::types::{IndexId, IndexUid}; +use quickwit_query::query_ast::QueryAst; +use tracing::{Span, instrument}; + +use crate::list_fields::{merge_entries, sort_and_dedup}; +use crate::search_job_placer::group_jobs_by_index_id; +use crate::{ + ClusterClient, SearchError, SearchJob, list_relevant_splits, resolve_index_patterns, + search_thread_pool, +}; + +/// Index metas needed for executing a leaf list fields request. +#[derive(Clone, Debug)] +struct IndexMetasForLeafSearch { + /// Index id. + index_id: IndexId, + /// Index URI. + index_uri: Uri, +} + +/// Performs a distributed list fields request. +/// 1. Sends leaf requests over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Builds the response and returns. +#[instrument(skip_all, fields(index_id_patterns = ?list_fields_req.index_id_patterns))] +pub async fn root_list_fields( + list_fields_req: ListFieldsRequest, + cluster_client: &ClusterClient, + mut metastore: MetastoreServiceClient, +) -> crate::Result { + let indexes_metadata = + resolve_index_patterns(&list_fields_req.index_id_patterns[..], &mut metastore).await?; + + // The request contains a wildcard, but couldn't find any index. + if indexes_metadata.is_empty() { + return Ok(ListFieldsResponse::default()); + } + // Build index metadata map and extract timestamp field for time range refinement + let mut index_uid_to_index_meta: HashMap = HashMap::new(); + let mut index_uids: Vec = Vec::new(); + let mut timestamp_field_opt: Option = None; + + for index_metadata in indexes_metadata { + // Extract timestamp field for time range refinement (use first index's field) + if timestamp_field_opt.is_none() + && list_fields_req.query_ast.is_some() + && let Ok(doc_mapper) = build_doc_mapper( + &index_metadata.index_config.doc_mapping, + &index_metadata.index_config.search_settings, + ) + { + timestamp_field_opt = doc_mapper.timestamp_field_name().map(|s| s.to_string()); + } + let index_metadata_for_leaf_search = IndexMetasForLeafSearch { + index_uri: index_metadata.index_uri().clone(), + index_id: index_metadata.index_config.index_id.to_string(), + }; + index_uids.push(index_metadata.index_uid.clone()); + + index_uid_to_index_meta.insert( + index_metadata.index_uid.clone(), + index_metadata_for_leaf_search, + ); + } + // Extract tags and refine time range from query_ast for split pruning + let mut start_timestamp = list_fields_req.start_timestamp; + let mut end_timestamp = list_fields_req.end_timestamp; + let tags_filter_opt = if let Some(ref query_ast_json) = list_fields_req.query_ast { + let query_ast: QueryAst = serde_json::from_str(query_ast_json) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + + // Refine time range from query AST if timestamp field is available + if let Some(ref timestamp_field) = timestamp_field_opt { + crate::root::refine_start_end_timestamp_from_ast( + &query_ast, + timestamp_field, + &mut start_timestamp, + &mut end_timestamp, + ); + } + extract_tags_from_query(query_ast) + } else { + None + }; + let split_metadatas: Vec = list_relevant_splits( + index_uids, + start_timestamp, + end_timestamp, + tags_filter_opt, + &mut metastore, + ) + .await?; + + // Build requests for each index id + let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await?; + + let mut leaf_request_futures = Vec::new(); + // For each node, forward to a node with an affinity for that index id. + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_requests = + jobs_to_leaf_requests(&list_fields_req, &index_uid_to_index_meta, client_jobs)?; + for leaf_request in leaf_requests { + leaf_request_futures + .push(cluster_client.leaf_list_fields(leaf_request, client.clone())); + } + } + let leaf_responses: Vec = try_join_all(leaf_request_futures).await?; + let leaf_entries: Vec> = leaf_responses + .into_iter() + .map(|response| response.entries) + .collect(); + let merged_entries = merge_fields_metadata(leaf_entries).await?; + let response = ListFieldsResponse { + entries: merged_entries, + }; + Ok(response) +} + +/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. +fn jobs_to_leaf_requests( + request: &ListFieldsRequest, + index_uid_to_id: &HashMap, + jobs: Vec, +) -> crate::Result> { + let search_request_for_leaf = request.clone(); + let mut leaf_search_requests = Vec::new(); + // Group jobs by index uid. + group_jobs_by_index_id(jobs, |job_group| { + let index_uid = &job_group[0].index_uid; + let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| { + SearchError::Internal(format!( + "received list fields job for an unknown index {index_uid}. it should never happen" + )) + })?; + + let leaf_search_request = LeafListFieldsRequest { + index_id: index_meta.index_id.to_string(), + index_uri: index_meta.index_uri.to_string(), + field_patterns: search_request_for_leaf.field_patterns.clone(), + split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), + }; + leaf_search_requests.push(leaf_search_request); + Ok(()) + })?; + + Ok(leaf_search_requests) +} + +#[instrument(skip_all, fields(num_leaves = entry_groups.len()))] +async fn merge_fields_metadata( + mut entry_groups: Vec>, +) -> crate::Result> { + let parent_span = Span::current(); + search_thread_pool() + .run_cpu_intensive(move || { + parent_span.in_scope(|| { + for entry_group in &mut entry_groups { + if !entry_group.is_sorted_by(|left, right| { + // Checking on less ensure that this is both sorted AND that there are no + // duplicates + left.cmp_by_name_and_type(right) == Ordering::Less + }) { + rate_limited_warn!( + limit_per_min = 1, + "fields metadata returned by a leaf are not strictly sorted, please \ + report" + ); + sort_and_dedup(entry_group); + } + } + merge_entries(entry_groups) + }) + }) + .await + .context("failed to merge single split list fields")? +} diff --git a/quickwit/quickwit-search/src/list_fields_cache.rs b/quickwit/quickwit-search/src/list_fields_cache.rs deleted file mode 100644 index 531e1015e4e..00000000000 --- a/quickwit/quickwit-search/src/list_fields_cache.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use quickwit_config::CacheConfig; -use quickwit_proto::search::{ - ListFields, SplitIdAndFooterOffsets, deserialize_split_fields, serialize_split_fields, -}; -use quickwit_proto::types::SplitId; -use quickwit_storage::{MemorySizedCache, OwnedBytes}; - -/// A cache to memoize `leaf_search_single_split` results. -pub struct ListFieldsCache { - content: MemorySizedCache, -} - -// TODO For now this simply caches the whole ListFieldsEntryResponse. We could -// be more clever and cache aggregates instead. -impl ListFieldsCache { - pub fn new(config: &CacheConfig) -> ListFieldsCache { - ListFieldsCache { - content: MemorySizedCache::from_config( - config, - &quickwit_storage::STORAGE_METRICS.partial_request_cache, - ), - } - } - pub fn get(&self, split_info: SplitIdAndFooterOffsets) -> Option { - let key = CacheKey::from_split_meta(split_info); - let encoded_result = self.content.get(&key)?; - // this should never fail - deserialize_split_fields(encoded_result).ok() - } - - pub fn put(&self, split_info: SplitIdAndFooterOffsets, list_fields: ListFields) { - let key = CacheKey::from_split_meta(split_info); - - let encoded_result = serialize_split_fields(list_fields); - self.content.put(key, OwnedBytes::new(encoded_result)); - } -} - -/// A key inside a [`ListFieldsCache`]. -#[derive(Debug, Hash, Clone, PartialEq, Eq)] -struct CacheKey { - /// The split this entry refers to - split_id: SplitId, -} - -impl CacheKey { - fn from_split_meta(split_info: SplitIdAndFooterOffsets) -> Self { - CacheKey { - split_id: split_info.split_id, - } - } -} - -#[cfg(test)] -mod tests { - use bytesize::ByteSize; - use quickwit_proto::search::{ - ListFieldType, ListFields, ListFieldsEntryResponse, SplitIdAndFooterOffsets, - }; - - use super::ListFieldsCache; - - #[test] - fn test_list_fields_cache() { - let cache = ListFieldsCache::new(&ByteSize::mb(64).into()); - - let split_1 = SplitIdAndFooterOffsets { - split_id: "split_1".to_string(), - split_footer_start: 0, - split_footer_end: 100, - timestamp_start: None, - timestamp_end: None, - num_docs: 0, - }; - - let split_2 = SplitIdAndFooterOffsets { - split_id: "split_2".to_string(), - split_footer_start: 0, - split_footer_end: 100, - timestamp_start: None, - timestamp_end: None, - num_docs: 0, - }; - - let result = ListFieldsEntryResponse { - field_name: "field1".to_string(), - field_type: ListFieldType::Str as i32, - searchable: false, - aggregatable: true, - non_searchable_index_ids: Vec::new(), - non_aggregatable_index_ids: Vec::new(), - index_ids: vec!["index4".to_string()], - }; - - assert!(cache.get(split_1.clone()).is_none()); - - let list_fields = ListFields { - fields: vec![result.clone()], - }; - - cache.put(split_1.clone(), list_fields.clone()); - assert_eq!(cache.get(split_1.clone()).unwrap(), list_fields); - assert!(cache.get(split_2).is_none()); - } -} diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 3b08e50e480..b0ea4e5f374 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -36,8 +36,7 @@ use tantivy::aggregation::AggregationLimitsGuard; use crate::invoker::LambdaLeafSearchInvoker; use crate::leaf::multi_index_leaf_search; use crate::leaf_cache::{LeafSearchCache, PredicateCacheImpl}; -use crate::list_fields::{leaf_list_fields, root_list_fields}; -use crate::list_fields_cache::ListFieldsCache; +use crate::list_fields::{ListFieldsCache, leaf_list_fields, root_list_fields}; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::metrics_trackers::LeafSearchMetricsFuture; use crate::root::fetch_docs_phase; @@ -252,7 +251,7 @@ impl SearchService for SearchServiceImpl { let leaf_search_response = leaf_list_terms( self.searcher_context.clone(), &search_request, - storage.clone(), + storage, &split_ids[..], ) .await?; @@ -305,10 +304,10 @@ impl SearchService for SearchServiceImpl { let split_ids = list_fields_req.split_offsets; leaf_list_fields( index_id, + &list_fields_req.field_patterns, + split_ids, + self.searcher_context.clone(), storage, - &self.searcher_context, - &split_ids[..], - &list_fields_req.fields, ) .await } @@ -419,7 +418,7 @@ pub struct SearcherContext { pub predicate_cache: Arc, /// Search split cache. `None` if no split cache is configured. pub split_cache_opt: Option>, - /// List fields cache. Caches the list fields response for a given split. + /// List fields cache. Caches the raw fields-metadata blob for a given split. pub list_fields_cache: ListFieldsCache, /// The aggregation limits are passed to limit the memory usage. /// This object is shared across all request. diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs index 9aefdc83762..292f7202644 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/field_capability.rs @@ -14,10 +14,12 @@ use std::collections::HashMap; -use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse, ListFieldsResponse}; +use itertools::Itertools; +use quickwit_proto::search::{ListFieldsEntry, ListFieldsResponse, ListFieldsType}; use quickwit_query::ElasticQueryDsl; use quickwit_query::query_ast::QueryAst; use serde::{Deserialize, Serialize}; +use tracing::instrument; use warp::hyper::StatusCode; use super::ElasticsearchError; @@ -110,7 +112,7 @@ struct FieldCapabilityEntryResponse { non_searchable_indices: Vec, // [ "index1" ] } impl FieldCapabilityEntryResponse { - fn from_list_field_entry_response(entry: ListFieldsEntryResponse) -> Self { + fn from_list_fields_entry(entry: ListFieldsEntry) -> Self { Self { metadata_field: false, searchable: entry.searchable, @@ -123,42 +125,43 @@ impl FieldCapabilityEntryResponse { } } +#[instrument(name = "build_field_capabilities_response", skip_all, fields(num_fields = response.entries.len()))] pub fn convert_to_es_field_capabilities_response( - resp: ListFieldsResponse, + response: ListFieldsResponse, ) -> FieldCapabilityResponse { - let mut indices = resp - .fields + let indices: Vec = response + .entries .iter() .flat_map(|entry| entry.index_ids.iter().cloned()) - .collect::>(); - indices.sort(); - indices.dedup(); + .sorted() + .dedup() + .collect(); let mut fields: HashMap = HashMap::new(); - for list_field_resp in resp.fields { + + for list_fields_entry in response.entries { let entry = fields - .entry(list_field_resp.field_name.to_string()) + .entry(list_fields_entry.field_name.to_string()) .or_default(); - let field_type = ListFieldType::try_from(list_field_resp.field_type).unwrap(); - let add_entry = - FieldCapabilityEntryResponse::from_list_field_entry_response(list_field_resp); + let field_type = list_fields_entry.field_type(); + let add_entry = FieldCapabilityEntryResponse::from_list_fields_entry(list_fields_entry); let types = match field_type { - ListFieldType::Str => { + ListFieldsType::Str => { vec![ FieldCapabilityEntryType::Keyword, FieldCapabilityEntryType::Text, ] } - ListFieldType::U64 => vec![FieldCapabilityEntryType::Long], - ListFieldType::I64 => vec![FieldCapabilityEntryType::Long], - ListFieldType::F64 => vec![FieldCapabilityEntryType::Double], - ListFieldType::Bool => vec![FieldCapabilityEntryType::Boolean], - ListFieldType::Date => vec![FieldCapabilityEntryType::DateNanos], - ListFieldType::Facet => continue, - ListFieldType::Json => continue, - ListFieldType::Bytes => vec![FieldCapabilityEntryType::Binary], - ListFieldType::IpAddr => vec![FieldCapabilityEntryType::Ip], + ListFieldsType::U64 => vec![FieldCapabilityEntryType::Long], + ListFieldsType::I64 => vec![FieldCapabilityEntryType::Long], + ListFieldsType::F64 => vec![FieldCapabilityEntryType::Double], + ListFieldsType::Bool => vec![FieldCapabilityEntryType::Boolean], + ListFieldsType::Date => vec![FieldCapabilityEntryType::DateNanos], + ListFieldsType::Facet => continue, + ListFieldsType::Json => continue, + ListFieldsType::Bytes => vec![FieldCapabilityEntryType::Binary], + ListFieldsType::IpAddr => vec![FieldCapabilityEntryType::Ip], }; for field_type in types { let mut add_entry = add_entry.clone(); @@ -222,7 +225,7 @@ pub fn build_list_field_request_for_es_api( Ok(quickwit_proto::search::ListFieldsRequest { index_id_patterns, - fields: search_params.fields.unwrap_or_default(), + field_patterns: search_params.fields.unwrap_or_default(), start_timestamp: search_params.start_timestamp, end_timestamp: search_params.end_timestamp, query_ast: query_ast_json, @@ -362,8 +365,8 @@ mod tests { .unwrap(); assert_eq!( - result.fields, - vec!["field1".to_string(), "field2".to_string()] + result.field_patterns, + ["field1".to_string(), "field2".to_string()] ); assert_eq!(result.start_timestamp, Some(1000)); assert_eq!(result.end_timestamp, Some(2000)); diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs index c639ed3daa9..ad09bf67243 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use quickwit_doc_mapper::{FieldMappingEntry, FieldMappingType}; use quickwit_metastore::IndexMetadata; -use quickwit_proto::search::{ListFieldType, ListFieldsResponse}; +use quickwit_proto::search::{ListFieldsResponse, ListFieldsType}; use serde::ser::SerializeMap; use serde::{Serialize, Serializer}; @@ -131,7 +131,7 @@ fn merge_dynamic_fields( properties: &mut HashMap, list_fields_response: &ListFieldsResponse, ) { - for field_entry in &list_fields_response.fields { + for field_entry in &list_fields_response.entries { let field_name = &field_entry.field_name; if field_name.starts_with('_') { continue; @@ -139,7 +139,7 @@ fn merge_dynamic_fields( if properties.contains_key(field_name) { continue; } - let Ok(field_type) = ListFieldType::try_from(field_entry.field_type) else { + let Ok(field_type) = ListFieldsType::try_from(field_entry.field_type) else { continue; }; if let Some(es_type) = es_type_from_list_field_type(field_type) { @@ -148,16 +148,16 @@ fn merge_dynamic_fields( } } -fn es_type_from_list_field_type(field_type: ListFieldType) -> Option<&'static str> { +fn es_type_from_list_field_type(field_type: ListFieldsType) -> Option<&'static str> { match field_type { - ListFieldType::Str => Some("keyword"), - ListFieldType::U64 | ListFieldType::I64 => Some("long"), - ListFieldType::F64 => Some("double"), - ListFieldType::Bool => Some("boolean"), - ListFieldType::Date => Some("date"), - ListFieldType::Bytes => Some("binary"), - ListFieldType::IpAddr => Some("ip"), - ListFieldType::Facet | ListFieldType::Json => None, + ListFieldsType::Str => Some("keyword"), + ListFieldsType::U64 | ListFieldsType::I64 => Some("long"), + ListFieldsType::F64 => Some("double"), + ListFieldsType::Bool => Some("boolean"), + ListFieldsType::Date => Some("date"), + ListFieldsType::Bytes => Some("binary"), + ListFieldsType::IpAddr => Some("ip"), + ListFieldsType::Facet | ListFieldsType::Json => None, } } @@ -274,26 +274,26 @@ mod tests { #[test] fn test_merge_dynamic_fields_skips_existing_and_internal() { - use quickwit_proto::search::ListFieldsEntryResponse; + use quickwit_proto::search::ListFieldsEntry; let mut properties = HashMap::new(); properties.insert("title".to_string(), FieldMapping::Leaf { typ: "text" }); let list_fields = ListFieldsResponse { - fields: vec![ - ListFieldsEntryResponse { + entries: vec![ + ListFieldsEntry { field_name: "title".to_string(), - field_type: ListFieldType::Str as i32, + field_type: ListFieldsType::Str as i32, ..Default::default() }, - ListFieldsEntryResponse { + ListFieldsEntry { field_name: "_timestamp".to_string(), - field_type: ListFieldType::Date as i32, + field_type: ListFieldsType::Date as i32, ..Default::default() }, - ListFieldsEntryResponse { + ListFieldsEntry { field_name: "dynamic_field".to_string(), - field_type: ListFieldType::Str as i32, + field_type: ListFieldsType::Str as i32, ..Default::default() }, ], diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 6242d3b85ca..9f4a83f357f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -216,7 +216,7 @@ pub(crate) async fn es_compat_index_mapping( .collect(); let list_fields_request = quickwit_proto::search::ListFieldsRequest { index_id_patterns, - fields: Vec::new(), + field_patterns: Vec::new(), start_timestamp: None, end_timestamp: None, query_ast: None,