diff --git a/Cargo.lock b/Cargo.lock index 6d605d6..838c2b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -765,7 +765,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec" +source = "git+https://github.com/delta-io/delta-rs.git?rev=2a0d3632f44a5ffa8cf6bf953615699547856719#2a0d3632f44a5ffa8cf6bf953615699547856719" dependencies = [ "anyhow", "arrow", @@ -774,6 +774,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "clap", + "datafusion", "env_logger", "errno", "futures", @@ -3323,3 +3324,8 @@ dependencies = [ "cc", "libc", ] + +[[patch.unused]] +name = "deltalake" +version = "0.4.1" +source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec" diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index d78024e..8eed9d0 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -36,7 +36,9 @@ rusoto_s3 = { version = "0.46" } rusoto_credential = { version = "0.46" } rusoto_sts = { version = "0.46" } -deltalake = { version = "0", features = ["s3"] } + +deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "2a0d3632f44a5ffa8cf6bf953615699547856719", features = ["s3", "datafusion-ext"] } +# deltalake = { version = "0", features = ["s3", "datafusion-ext"] } [dev-dependencies] anyhow = "1" diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index f0efcb6..7a9a648 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -4,13 +4,54 @@ use std::sync::Arc; use datafusion::arrow; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::TableProvider; use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use datafusion::parquet::file::reader::SerializedFileReader; use datafusion::parquet::file::serialized_reader::SliceableCursor; use crate::error::ColumnQError; use crate::io; -use crate::table::TableSource; +use crate::table::{TableLoadOption, TableOptionDelta, TableSource}; +use deltalake; + +pub async fn to_datafusion_table(t: &TableSource) -> Result, ColumnQError> { + let opt = t + .option + .clone() + .unwrap_or_else(|| TableLoadOption::delta(TableOptionDelta::default())); + + let TableOptionDelta { use_memory_table } = opt.as_delta()?; + + let uri_str = t.get_uri_str(); + let delta_table = deltalake::open_table(uri_str).await?; + let parsed_uri = t.parsed_uri()?; + let blob_type = io::BlobStoreType::try_from(parsed_uri.scheme())?; + + if *use_memory_table { + to_mem_table(delta_table, blob_type).await + } else { + to_delta_table(delta_table, blob_type).await + } +} + +pub async fn to_delta_table( + delta_table: deltalake::DeltaTable, + blob_type: io::BlobStoreType, +) -> Result, ColumnQError> { + match blob_type { + io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)), + io::BlobStoreType::S3 => Err(ColumnQError::LoadDelta(format!( + "S3 for delta table currently only supported in conjunction with `to_memory_table` config: {}", + delta_table.table_uri, + ))), + _ => { + return Err(ColumnQError::InvalidUri(format!( + "Scheme in table uri not supported for delta table: {}", + delta_table.table_uri, + ))); + } + } +} fn read_partition(mut r: R, batch_size: usize) -> Result, ColumnQError> { let mut buffer = Vec::new(); @@ -32,21 +73,17 @@ fn read_partition(mut r: R, batch_size: usize) -> Result Result { + delta_table: deltalake::DeltaTable, + blob_type: io::BlobStoreType, +) -> Result, ColumnQError> { // TODO: make batch size configurable let batch_size = 1024; - let uri_str = t.get_uri_str(); - let delta_table = deltalake::open_table(uri_str).await?; - if delta_table.get_files().is_empty() { return Err(ColumnQError::LoadDelta("empty delta table".to_string())); } let delta_schema = delta_table.get_schema()?; - let uri = t.parsed_uri()?; - let blob_type = io::BlobStoreType::try_from(uri.scheme())?; let paths = delta_table.get_file_uris(); let path_iter = paths.iter().map(|s| s.as_str()); @@ -70,13 +107,73 @@ pub async fn to_mem_table( _ => { return Err(ColumnQError::InvalidUri(format!( "Scheme in table uri not supported for delta table: {}", - uri_str, + delta_table.table_uri, ))); } }; - Ok(datafusion::datasource::MemTable::try_new( + Ok(Arc::new(datafusion::datasource::MemTable::try_new( Arc::new(delta_schema.try_into()?), partitions, - )?) + )?)) +} + +#[cfg(test)] +mod tests { + + use super::*; + use datafusion::datasource::datasource::Statistics; + use datafusion::datasource::MemTable; + + use deltalake::DeltaTable; + + use crate::error::ColumnQError; + use crate::test_util::test_data_path; + + #[tokio::test] + async fn load_delta_as_memtable() -> Result<(), ColumnQError> { + let t = to_datafusion_table( + &TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option( + TableLoadOption::delta(TableOptionDelta { + use_memory_table: true, + }), + ), + ) + .await?; + + validate_statistics(t.statistics()); + + match t.as_any().downcast_ref::() { + Some(_) => Ok(()), + None => panic!("must be of type datafusion::datasource::MemTable"), + } + } + + #[tokio::test] + async fn load_delta_as_delta_source() -> Result<(), ColumnQError> { + let t = to_datafusion_table( + &TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option( + TableLoadOption::delta(TableOptionDelta { + use_memory_table: false, + }), + ), + ) + .await?; + + match t.as_any().downcast_ref::() { + Some(delta_table) => { + assert_eq!(delta_table.version, 0); + Ok(()) + } + None => panic!("must be of type deltalake::DeltaTable"), + } + } + + fn validate_statistics(stats: Statistics) { + assert_eq!(stats.num_rows, Some(500)); + let column_stats = stats.column_statistics.unwrap(); + assert_eq!(column_stats[0].null_count, Some(245)); + assert_eq!(column_stats[1].null_count, Some(373)); + assert_eq!(column_stats[2].null_count, Some(237)); + } } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index bef2381..8638a3e 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -145,6 +145,27 @@ impl Default for TableOptionParquet { } } +#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct TableOptionDelta { + #[serde(default = "TableOptionDelta::default_use_memory_table")] + use_memory_table: bool, +} + +impl TableOptionDelta { + #[inline] + pub fn default_use_memory_table() -> bool { + true + } +} + +impl Default for TableOptionDelta { + fn default() -> Self { + Self { + use_memory_table: Self::default_use_memory_table(), + } + } +} + // Adding new table format: // * update TableLoadOption enum to add the new variant // * update TableLoadOption.extension @@ -165,7 +186,7 @@ pub enum TableLoadOption { ndjson {}, parquet(TableOptionParquet), google_spreadsheet(TableOptionGoogleSpreasheet), - delta {}, + delta(TableOptionDelta), arrow {}, arrows {}, } @@ -194,6 +215,13 @@ impl TableLoadOption { } } + fn as_delta(&self) -> Result<&TableOptionDelta, ColumnQError> { + match self { + Self::delta(opt) => Ok(opt), + _ => Err(ColumnQError::ExpectFormatOption("delta".to_string())), + } + } + pub fn extension<'a>(&'a self) -> &'static str { match self { Self::json { .. } => "json", @@ -352,7 +380,7 @@ pub async fn load(t: &TableSource) -> Result, ColumnQErro TableLoadOption::google_spreadsheet(_) => { Arc::new(google_spreadsheets::to_mem_table(t).await?) } - TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?), + TableLoadOption::delta { .. } => delta::to_datafusion_table(t).await?, TableLoadOption::arrow { .. } => Arc::new(arrow_ipc_file::to_mem_table(t).await?), TableLoadOption::arrows { .. } => Arc::new(arrow_ipc_stream::to_mem_table(t).await?), }) diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 8fe03d6..3fbd1d1 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -114,7 +114,7 @@ mod tests { match t.as_any().downcast_ref::() { Some(_) => Ok(()), - None => panic!("not read a datafusion::ParquetTable"), + None => panic!("must be of type datafusion::datasource::parquet::ParquetTable"), } } diff --git a/test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc b/test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc new file mode 100644 index 0000000..0b07196 Binary files /dev/null and b/test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc differ diff --git a/test_data/blogs-delta/_delta_log/00000000000000000000.json b/test_data/blogs-delta/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..ebf5936 --- /dev/null +++ b/test_data/blogs-delta/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1630864087805,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"4750","numOutputRows":"500"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"373e4420-3d29-4f0b-9ec0-0427dd6f9c68","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"reply_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"next_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"blog_id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1630864087072}} +{"add":{"path":"part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet","partitionValues":{},"size":4750,"modificationTime":1630864087000,"dataChange":true}} diff --git a/test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet b/test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet new file mode 100644 index 0000000..575b33b Binary files /dev/null and b/test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet differ