From ea84099b0718b993427c47ba45a49d185489594a Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Mon, 6 Sep 2021 09:56:55 +0200 Subject: [PATCH] Lazy load delta: Support for large tables (#71) * Allow for delta tables to be directly backed by storage. Enables experimental support for delta tables that are too large to be stored in memory. We directly expose `DeltaTable` instead of copying the data into a datafusion::Memtable. Disadvantages: - in the new mode, no support for S3 - as we're relying on datafusion to handle the parquet files directly, nested schemas and certain data types may not work properly. --- Cargo.lock | 8 +- columnq/Cargo.toml | 4 +- columnq/src/table/delta.rs | 119 ++++++++++++++++-- columnq/src/table/mod.rs | 32 ++++- columnq/src/table/parquet.rs | 2 +- ...-aa68-b3542389889a-c000.snappy.parquet.crc | Bin 0 -> 48 bytes .../_delta_log/00000000000000000000.json | 4 + ...46ac-aa68-b3542389889a-c000.snappy.parquet | Bin 0 -> 4750 bytes 8 files changed, 153 insertions(+), 16 deletions(-) create mode 100644 test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc create mode 100644 test_data/blogs-delta/_delta_log/00000000000000000000.json create mode 100644 test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet diff --git a/Cargo.lock b/Cargo.lock index 6d605d6..838c2b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -765,7 +765,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec" +source = "git+https://github.com/delta-io/delta-rs.git?rev=2a0d3632f44a5ffa8cf6bf953615699547856719#2a0d3632f44a5ffa8cf6bf953615699547856719" dependencies = [ "anyhow", "arrow", @@ -774,6 +774,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "clap", + "datafusion", "env_logger", "errno", "futures", @@ -3323,3 +3324,8 @@ dependencies = [ "cc", "libc", ] + +[[patch.unused]] +name = "deltalake" +version = "0.4.1" +source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec" diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index d78024e..8eed9d0 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -36,7 +36,9 @@ rusoto_s3 = { version = "0.46" } rusoto_credential = { version = "0.46" } rusoto_sts = { version = "0.46" } -deltalake = { version = "0", features = ["s3"] } + +deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "2a0d3632f44a5ffa8cf6bf953615699547856719", features = ["s3", "datafusion-ext"] } +# deltalake = { version = "0", features = ["s3", "datafusion-ext"] } [dev-dependencies] anyhow = "1" diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index f0efcb6..7a9a648 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -4,13 +4,54 @@ use std::sync::Arc; use datafusion::arrow; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::TableProvider; use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use datafusion::parquet::file::reader::SerializedFileReader; use datafusion::parquet::file::serialized_reader::SliceableCursor; use crate::error::ColumnQError; use crate::io; -use crate::table::TableSource; +use crate::table::{TableLoadOption, TableOptionDelta, TableSource}; +use deltalake; + +pub async fn to_datafusion_table(t: &TableSource) -> Result, ColumnQError> { + let opt = t + .option + .clone() + .unwrap_or_else(|| TableLoadOption::delta(TableOptionDelta::default())); + + let TableOptionDelta { use_memory_table } = opt.as_delta()?; + + let uri_str = t.get_uri_str(); + let delta_table = deltalake::open_table(uri_str).await?; + let parsed_uri = t.parsed_uri()?; + let blob_type = io::BlobStoreType::try_from(parsed_uri.scheme())?; + + if *use_memory_table { + to_mem_table(delta_table, blob_type).await + } else { + to_delta_table(delta_table, blob_type).await + } +} + +pub async fn to_delta_table( + delta_table: deltalake::DeltaTable, + blob_type: io::BlobStoreType, +) -> Result, ColumnQError> { + match blob_type { + io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)), + io::BlobStoreType::S3 => Err(ColumnQError::LoadDelta(format!( + "S3 for delta table currently only supported in conjunction with `to_memory_table` config: {}", + delta_table.table_uri, + ))), + _ => { + return Err(ColumnQError::InvalidUri(format!( + "Scheme in table uri not supported for delta table: {}", + delta_table.table_uri, + ))); + } + } +} fn read_partition(mut r: R, batch_size: usize) -> Result, ColumnQError> { let mut buffer = Vec::new(); @@ -32,21 +73,17 @@ fn read_partition(mut r: R, batch_size: usize) -> Result Result { + delta_table: deltalake::DeltaTable, + blob_type: io::BlobStoreType, +) -> Result, ColumnQError> { // TODO: make batch size configurable let batch_size = 1024; - let uri_str = t.get_uri_str(); - let delta_table = deltalake::open_table(uri_str).await?; - if delta_table.get_files().is_empty() { return Err(ColumnQError::LoadDelta("empty delta table".to_string())); } let delta_schema = delta_table.get_schema()?; - let uri = t.parsed_uri()?; - let blob_type = io::BlobStoreType::try_from(uri.scheme())?; let paths = delta_table.get_file_uris(); let path_iter = paths.iter().map(|s| s.as_str()); @@ -70,13 +107,73 @@ pub async fn to_mem_table( _ => { return Err(ColumnQError::InvalidUri(format!( "Scheme in table uri not supported for delta table: {}", - uri_str, + delta_table.table_uri, ))); } }; - Ok(datafusion::datasource::MemTable::try_new( + Ok(Arc::new(datafusion::datasource::MemTable::try_new( Arc::new(delta_schema.try_into()?), partitions, - )?) + )?)) +} + +#[cfg(test)] +mod tests { + + use super::*; + use datafusion::datasource::datasource::Statistics; + use datafusion::datasource::MemTable; + + use deltalake::DeltaTable; + + use crate::error::ColumnQError; + use crate::test_util::test_data_path; + + #[tokio::test] + async fn load_delta_as_memtable() -> Result<(), ColumnQError> { + let t = to_datafusion_table( + &TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option( + TableLoadOption::delta(TableOptionDelta { + use_memory_table: true, + }), + ), + ) + .await?; + + validate_statistics(t.statistics()); + + match t.as_any().downcast_ref::() { + Some(_) => Ok(()), + None => panic!("must be of type datafusion::datasource::MemTable"), + } + } + + #[tokio::test] + async fn load_delta_as_delta_source() -> Result<(), ColumnQError> { + let t = to_datafusion_table( + &TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option( + TableLoadOption::delta(TableOptionDelta { + use_memory_table: false, + }), + ), + ) + .await?; + + match t.as_any().downcast_ref::() { + Some(delta_table) => { + assert_eq!(delta_table.version, 0); + Ok(()) + } + None => panic!("must be of type deltalake::DeltaTable"), + } + } + + fn validate_statistics(stats: Statistics) { + assert_eq!(stats.num_rows, Some(500)); + let column_stats = stats.column_statistics.unwrap(); + assert_eq!(column_stats[0].null_count, Some(245)); + assert_eq!(column_stats[1].null_count, Some(373)); + assert_eq!(column_stats[2].null_count, Some(237)); + } } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index bef2381..8638a3e 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -145,6 +145,27 @@ impl Default for TableOptionParquet { } } +#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct TableOptionDelta { + #[serde(default = "TableOptionDelta::default_use_memory_table")] + use_memory_table: bool, +} + +impl TableOptionDelta { + #[inline] + pub fn default_use_memory_table() -> bool { + true + } +} + +impl Default for TableOptionDelta { + fn default() -> Self { + Self { + use_memory_table: Self::default_use_memory_table(), + } + } +} + // Adding new table format: // * update TableLoadOption enum to add the new variant // * update TableLoadOption.extension @@ -165,7 +186,7 @@ pub enum TableLoadOption { ndjson {}, parquet(TableOptionParquet), google_spreadsheet(TableOptionGoogleSpreasheet), - delta {}, + delta(TableOptionDelta), arrow {}, arrows {}, } @@ -194,6 +215,13 @@ impl TableLoadOption { } } + fn as_delta(&self) -> Result<&TableOptionDelta, ColumnQError> { + match self { + Self::delta(opt) => Ok(opt), + _ => Err(ColumnQError::ExpectFormatOption("delta".to_string())), + } + } + pub fn extension<'a>(&'a self) -> &'static str { match self { Self::json { .. } => "json", @@ -352,7 +380,7 @@ pub async fn load(t: &TableSource) -> Result, ColumnQErro TableLoadOption::google_spreadsheet(_) => { Arc::new(google_spreadsheets::to_mem_table(t).await?) } - TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?), + TableLoadOption::delta { .. } => delta::to_datafusion_table(t).await?, TableLoadOption::arrow { .. } => Arc::new(arrow_ipc_file::to_mem_table(t).await?), TableLoadOption::arrows { .. } => Arc::new(arrow_ipc_stream::to_mem_table(t).await?), }) diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 8fe03d6..3fbd1d1 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -114,7 +114,7 @@ mod tests { match t.as_any().downcast_ref::() { Some(_) => Ok(()), - None => panic!("not read a datafusion::ParquetTable"), + None => panic!("must be of type datafusion::datasource::parquet::ParquetTable"), } } diff --git a/test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc b/test_data/blogs-delta/.part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..0b07196a77f7922a9259101156d8bc5864f0065b GIT binary patch literal 48 zcmV-00MGwpa$^7h00IEwuc^q}GW}+xANN{VPnL2@O8H`k_q7cJX?YM-+1a!TyO;9S GrM?&e*B0ym literal 0 HcmV?d00001 diff --git a/test_data/blogs-delta/_delta_log/00000000000000000000.json b/test_data/blogs-delta/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..ebf5936 --- /dev/null +++ b/test_data/blogs-delta/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1630864087805,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"4750","numOutputRows":"500"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"373e4420-3d29-4f0b-9ec0-0427dd6f9c68","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"reply_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"next_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"blog_id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1630864087072}} +{"add":{"path":"part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet","partitionValues":{},"size":4750,"modificationTime":1630864087000,"dataChange":true}} diff --git a/test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet b/test_data/blogs-delta/part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..575b33b7e1dc149ba7d3af268be9ea8022a4b487 GIT binary patch literal 4750 zcmbtYXH*kRvksw!9;zfDkRToDy-Kf26GVDR=nx7ewgBE6#sf)o*?C>`kn(xi!C zK>|n(5UDqQuip2){l`6Lf6VO6K07<}oH=vWP{ml35x^Kt%a}?l$hby|mq{2wtjt7` zRqrC0iHbx>BAM0+dHKE@&x#L| zx$USN_i4mZMz&3R=q_)s#d=1j<_tnbfG(GNWeF4LaXh-!Yga=b$^ESCRjq)iN{UuV7uO%LW*WLzK)lANxeI50z9%D7;Lf#2k`dFu@!Y^ zx)KYfQUsqf#k?mc7tn9AWyvF<1YjXdBI#76-hQy?rZSSa88Od-%GZ-~hHi>?UGB~F zkXEe7?v3PQgG(xWDj^B$x;}(chEs%=?kIHWO?oDDlU*jOL~SfRz$hJ;m1wi(ziAIW z#6<1yGgbPG$~&0?#212iUJKMgz;jp5$oMp6hurME?h9Ttl-*p!KGEtPWRTaavaQ61 z!h27v&R7K%G^2u(ZR}-yXILr*cCSe8?ThDc@$o<1U??}7bLne=BC2Q^g$3e< z`9Q$?P>yjQ6Z#Sxk*}Vs7{kdZ5yhs7lE{8*R2_Hbm|gQT*UT<%d+Qc|Ts)u&Tp*VbX$g}kP6N->oFpl1u54R&e2b1d^5co#m*I^aKKYoG`= z_T4+7&TPbt{vxeWq7-@C;08R5uWe3isx@;;g{GAz%t1n6W{i-v9MfJa=sNeonxCBu zmLf7ObLHl}WNpWh_}*vWGU=3V8SK*%@9xx^?0j}eEpnEfSSq&MrB$%~+}sfODdPhb z0i$g&2N6!+C^5J&squ8(_NlybMnc@QSec7pk7GvGfv?JAcF`{Wc2Zl9UZ>2}1x9Y! zp&p{TelNBg(+^CD!rwbidmZiwT6Ivxuht=*N)jUwryX?jN5IRyLtL1B&#U~y zqhr~mBBl!p*z97&5xrIjLH4t$#F#1Lrn2H%dWfUDt0Fz(<&=S)cjo7JU?R1Hw$=WbqSbB|z zuZ67QS3`fljy=q`$4j$gYdH*LJodcTcygR@UwBG|WKW(?(D)$Wi;2z@TV2F+lwVxB zJGqu?bUxjzc@f4^F-Zn}Q4JZDEgcZ&S&*<~J~`jDwxm%8Sw$Pu>R3T-@mv zQ+|?G+0OkIPd^-t@h<$I^aJtP5g?_k=_8myJ$WU@2h<-TndS+3|C?<*3s1q=5YRQe z5OHZc)iF+KAuXORXoM*v%W7zQjJEKp%hpGQB4x~{by~=to*tWNFCnaH>y{^k=W}Ti zzgd(=jiOJIQ@oT1@PYDNYc5n+}8C zj3vumxwA35aFgs5&>cE(IKzE{2s`)_ zWTaZVQrAxnKwN{fd?E*uXg*sv_fsPDqAIE+FLo{s#C|k=krCiiI8j~dXoco~V7UA# zlQT0*GI@HI$+GpWXnYQQML?ffE!m;+aq%*pC*U-W`z3%aF&mxfrBDYem}c{=!9_=cY_=&3)~IEiMc0fR~m#oeBE^?)Wnj9PnE zy;xPZyqX&K3)H82qD?V~`WPT~M+xSiMtOW~2* z$FUqjx0lJqTeO(Jim{v+>;8;-D2EPrr`_9Dy(ZIQQf^OsB7T@Z<`p0qfvnB%{_2fQ z8!8J|GU-x(Wo~9hAKz&2ecs|2R!yLKp+>{;=11}SEMx7<8rTFTCwsxqN4aK+)P+#x z`-c^MUO)zR6CDsTf^VJmF!j6_(dWSTQyp74A>M*(e4b^Z)RvMP@+nWad`FTp9{gUA z!9Vh1VBcrKKP3%NZ8gU=ik`*p)wl}1NumQ#$}Rrz6`FVt7Zsp09Fj_J=Fm(UjgQPb zj^sM$h-j|UKyH&|s9<_}%@+1hBhPrEd`_Km4C7~XA^W1rS<`)(s-(B1mLxkaTCbD2 z5nHzss|q$_a}^btPZDvr7q{TiT@_j*g3;52W=A97mGME%f`|Q)0xx+LuN`oTX(8ESo_sKFi zdGn0{%EV^b51DXvq|h%nzS5*zMUVkOt6)B=Yu@I&XjN0DYHU7G|2BUAA<+NfHxlKC zJm_be#Rc^9!mUotjf2M0x{b-0gJu6RX4Z54x!t{QX^Xv5X2yP6B2;hatnJCpO!U0w zzD&)5#Q7>!bG7z56^Aunh-29eUduCP;jY zb>%iQ{__NbI!kp@T3JEZBE}QG}I$F)(W-JuqlBV-Sd}& zL2kaC-W6IfnIo}S6*KnDQ))a4QVA8cL6&Yy6Fn=+GNzJWP|z^6dx2J>s7#DA+c0p1 zzLc4cueNQIkQI4)%JRzaCTBz7F;v6S?Iew+-1*Bw=uT+ZW##*CG`=FXSD&MuzSpWu zjn{uy>3EZemSU+gIHYE-e9vRuF|9gB_U(RP32+XoN5qFYhBA*ov_TsZ=^9;aReV|l zabtf4MfTaK^_K^a|B&eZ$t7rH6;wCNmBCp{swrH+7B>@h=t(G2;-7HmPO-{VNNG#m z=xkAum$~zmfuQeP?pP;U<*|1LqGr_qEiZDPQ0YPg^q4n`yO>!+BG;sTpx=WJhoimmCYKC?#%??f$w zn4o)ump+CwP~!S-sIuY0O&sq#=EXSL+h$K#Y`;q#>9Hme9wzdqT!`zw{)1Y0{M1sW z7OfjiFxTk9A0?orzQeFO7W8bZ5=Iy)mYA&yjb=pYC{`4!e@XH&DvySr#>cU@hA zLwvJ8qm;9W)ojfvMs(nC&hjDAwuH?VRpR0prC~YNgBPZ@1dA}4uKWTwu4QqX#Jnj> zy3$7Z6UP|RKIjz`vnEN?GIUBObMWHLyio>Qw#{w9^`-W{*M1$>uZYc>oxaLRFiXzs z$Fy8kDkkbw<(N$1*fX_$FWr7C+53fltWf@9Usaw6v_=Ax3l*rj_xzp=;dPoS1#V}Ff;RqD3=^{UVXPjC-MxDB&P!xuXGbNFuqA_fLP@UJgnG06B90`JhWEe(?6ISwYk-lEaRbPkH(78+RNpA}YA z+!F&5pbk&Du%1Ry)(Rp^>Bk5To3SxU>2E6y6Me1YOW~ORikZH9C~NkDdg( z-%N1%^B|IE&Cx~(vmB=F$mNdD{))@=O&n27jIC7R=`U> z7=c_sW{@w^+at)<74er!>V>@H_b-hIZzOf_Kso=?nAS)ktPllm5Gx)@k^}^R{C$uS zh_^DW(lYJPa)T89xBmbLD}Iu~--927xN)?9L;bn*FC;7((>WR5j)(etrvJ|){y(7( zSy&l~en<5u;=iDd;Y`t-+=na*%>T_?_CJkl|A1ziQo_Ft01;$nM)^7m*?ZgLb0_q# zGztYEebKHcFJ@wKArT=F05jiz@j&}{2;npC