From 5ace8b86956a32d1a759a6cb12b5277de3ce3707 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Fri, 3 Sep 2021 21:36:18 +0200 Subject: [PATCH] Lazy load parquet (#63) * PoC for parquet: reading a table by registering parquet directly. * Adding config flag and restoring existing _in-memory_ code path. * Addressing review comment: separate `to_mem_table()`. * Addressing review comment: default-able `LoadOptionParquet`. * Adding test: make sure we instantiated `datafusion::datasource::ParquetTable` --- columnq/src/columnq.rs | 6 +-- columnq/src/table/mod.rs | 84 +++++++++++++++++++++--------- columnq/src/table/parquet.rs | 72 +++++++++++++++++++------ columnq/src/test_util.rs | 4 +- test_data/blogs_flattened.parquet | Bin 0 -> 4750 bytes 5 files changed, 119 insertions(+), 47 deletions(-) create mode 100644 test_data/blogs_flattened.parquet diff --git a/columnq/src/columnq.rs b/columnq/src/columnq.rs index 6d93f33..cdb99e4 100644 --- a/columnq/src/columnq.rs +++ b/columnq/src/columnq.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; -use std::sync::Arc; use datafusion::arrow; pub use datafusion::execution::context::ExecutionConfig; -use datafusion::{datasource::TableProvider, execution::context::ExecutionContext}; +use datafusion::execution::context::ExecutionContext; use crate::error::{ColumnQError, QueryError}; use crate::query; @@ -28,8 +27,7 @@ impl ColumnQ { pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> { let table = table::load(t).await?; self.schema_map.insert(t.name.clone(), table.schema()); - self.dfctx - .register_table(t.name.as_str(), Arc::new(table))?; + self.dfctx.register_table(t.name.as_str(), table)?; Ok(()) } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 9cd690b..21d3ec3 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -2,6 +2,9 @@ use std::convert::TryFrom; use std::ffi::OsStr; use std::io::Read; use std::path::Path; +use std::sync::Arc; + +use datafusion::datasource::TableProvider; use datafusion::arrow; use serde::de::{Deserialize, Deserializer}; @@ -119,6 +122,27 @@ impl Default for TableOptionCsv { } } +#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct TableOptionParquet { + #[serde(default = "TableOptionParquet::default_use_memory_table")] + use_memory_table: bool, +} + +impl TableOptionParquet { + #[inline] + pub fn default_use_memory_table() -> bool { + true + } +} + +impl Default for TableOptionParquet { + fn default() -> Self { + Self { + use_memory_table: Self::default_use_memory_table(), + } + } +} + // Adding new table format: // * update TableLoadOption enum to add the new variant // * update TableLoadOption.extension @@ -137,7 +161,7 @@ pub enum TableLoadOption { }, csv(TableOptionCsv), ndjson {}, - parquet {}, + parquet(TableOptionParquet), google_spreadsheet(TableOptionGoogleSpreasheet), delta {}, } @@ -159,6 +183,13 @@ impl TableLoadOption { } } + fn as_parquet(&self) -> Result<&TableOptionParquet, ColumnQError> { + match self { + Self::parquet(opt) => Ok(opt), + _ => Err(ColumnQError::ExpectFormatOption("parquet".to_string())), + } + } + pub fn extension<'a>(&'a self) -> &'static str { match self { Self::json { .. } => "json", @@ -305,33 +336,34 @@ impl TableSource { } } -pub async fn load(t: &TableSource) -> Result { +pub async fn load(t: &TableSource) -> Result, ColumnQError> { if let Some(opt) = &t.option { - return Ok(match opt { - TableLoadOption::json { .. } => json::to_mem_table(t).await?, - TableLoadOption::ndjson { .. } => ndjson::to_mem_table(t).await?, - TableLoadOption::csv { .. } => csv::to_mem_table(t).await?, - TableLoadOption::parquet { .. } => parquet::to_mem_table(t).await?, - TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?, - TableLoadOption::delta { .. } => delta::to_mem_table(t).await?, - }); + Ok(match opt { + TableLoadOption::json { .. } => Arc::new(json::to_mem_table(t).await?), + TableLoadOption::ndjson { .. } => Arc::new(ndjson::to_mem_table(t).await?), + TableLoadOption::csv { .. } => Arc::new(csv::to_mem_table(t).await?), + TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t).await?, + TableLoadOption::google_spreadsheet(_) => { + Arc::new(google_spreadsheets::to_mem_table(t).await?) + } + TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?), + }) + } else { + let t: Arc = match t.extension()? { + "csv" => Arc::new(csv::to_mem_table(t).await?), + "json" => Arc::new(json::to_mem_table(t).await?), + "ndjson" => Arc::new(ndjson::to_mem_table(t).await?), + "parquet" => parquet::to_datafusion_table(t).await?, + ext => { + return Err(ColumnQError::InvalidUri(format!( + "failed to register `{}` as table `{}`, unsupported table format `{}`", + t.io_source, t.name, ext, + ))); + } + }; + + Ok(t) } - - // no format specified explictly, try to guess from file path - let t = match t.extension()? { - "csv" => csv::to_mem_table(t).await?, - "json" => json::to_mem_table(t).await?, - "ndjson" => ndjson::to_mem_table(t).await?, - "parquet" => parquet::to_mem_table(t).await?, - ext => { - return Err(ColumnQError::InvalidUri(format!( - "failed to register `{}` as table `{}`, unsupported table format `{}`", - t.io_source, t.name, ext, - ))); - } - }; - - Ok(t) } /// For parsing table URI arg in CLI diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index b826a4f..8fe03d6 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -1,19 +1,37 @@ use std::io::Read; use std::sync::Arc; +use crate::error::ColumnQError; +use crate::table::{TableLoadOption, TableOptionParquet, TableSource}; + use datafusion::arrow; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::parquet::ParquetTable; +use datafusion::datasource::TableProvider; use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use datafusion::parquet::file::reader::SerializedFileReader; use datafusion::parquet::file::serialized_reader::SliceableCursor; -use crate::error::ColumnQError; -use crate::table::TableSource; +pub async fn to_datafusion_table(t: &TableSource) -> Result, ColumnQError> { + let opt = t + .option + .clone() + .unwrap_or_else(|| TableLoadOption::parquet(TableOptionParquet::default())); + let TableOptionParquet { use_memory_table } = opt.as_parquet()?; -pub async fn to_mem_table( - t: &TableSource, -) -> Result { + if *use_memory_table { + to_mem_table(t).await + } else { + Ok(Arc::new( + ParquetTable::try_new(t.parsed_uri()?, 4).map_err(|err| { + ColumnQError::LoadParquet(format!("failed to load parquet: '{}'", err.to_string())) + })?, + )) + } +} + +pub async fn to_mem_table(t: &TableSource) -> Result, ColumnQError> { // TODO: make batch size configurable let batch_size = 1024; @@ -56,14 +74,15 @@ pub async fn to_mem_table( )); } - Ok(datafusion::datasource::MemTable::try_new( - Arc::new( - schema.ok_or_else(|| { + let table = + Arc::new(datafusion::datasource::MemTable::try_new( + Arc::new(schema.ok_or_else(|| { ColumnQError::LoadParquet("schema not found for table".to_string()) - })?, - ), - partitions, - )?) + })?), + partitions, + )?); + + Ok(table) } #[cfg(test)] @@ -71,11 +90,34 @@ mod tests { use super::*; use std::fs; - use datafusion::datasource::TableProvider; - use crate::table::TableLoadOption; use crate::test_util::*; + #[tokio::test] + async fn load_flattened_parquet() -> Result<(), ColumnQError> { + let t = to_datafusion_table( + &TableSource::new( + "blogs".to_string(), + test_data_path("blogs_flattened.parquet"), + ) + .with_option(TableLoadOption::parquet(TableOptionParquet { + use_memory_table: false, + })), + ) + .await?; + + assert_eq!(t.statistics().num_rows, Some(500)); + let stats = t.statistics().column_statistics.unwrap(); + assert_eq!(stats[0].null_count, Some(245)); + assert_eq!(stats[1].null_count, Some(373)); + assert_eq!(stats[2].null_count, Some(237)); + + match t.as_any().downcast_ref::() { + Some(_) => Ok(()), + None => panic!("not read a datafusion::ParquetTable"), + } + } + #[tokio::test] async fn load_simple_parquet() -> Result<(), ColumnQError> { let t = to_mem_table(&TableSource::new( @@ -110,7 +152,7 @@ mod tests { let t = to_mem_table( &TableSource::new_with_uri("blogs", tmp_dir_path.to_string_lossy()) - .with_option(TableLoadOption::parquet {}), + .with_option(TableLoadOption::parquet(TableOptionParquet::default())), ) .await?; diff --git a/columnq/src/test_util.rs b/columnq/src/test_util.rs index 78bbf7f..87066b5 100644 --- a/columnq/src/test_util.rs +++ b/columnq/src/test_util.rs @@ -89,7 +89,7 @@ fn properties_table() -> anyhow::Result { Ok(MemTable::try_new(schema, vec![vec![record_batch]])?) } -async fn ubuntu_ami_table() -> anyhow::Result { +async fn ubuntu_ami_table() -> anyhow::Result> { let mut table_source: table::TableSource = serde_yaml::from_str( r#" name: "ubuntu_ami" @@ -131,7 +131,7 @@ pub fn register_table_properties(dfctx: &mut ExecutionContext) -> anyhow::Result } pub async fn register_table_ubuntu_ami(dfctx: &mut ExecutionContext) -> anyhow::Result<()> { - dfctx.register_table("ubuntu_ami", Arc::new(ubuntu_ami_table().await?))?; + dfctx.register_table("ubuntu_ami", ubuntu_ami_table().await?)?; Ok(()) } diff --git a/test_data/blogs_flattened.parquet b/test_data/blogs_flattened.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9a105dfbff38e2d1448ebcc4b2d702371dcfd4c0 GIT binary patch literal 4750 zcmbtYXH*kRvksw!9;zfDkRToDy-Kf26GVDR=nx7ewgBE6#sf)o*?C>`kn(xi!C zK>|n(5UDqQuip2){l`6Lf6VO6K07<}oH=vWP{ml35x^Kt%a}?l$hby|mq{2wtjt7` zRqrC0iHbx>BAM0+dHKE@&x#L| zx$USN_i4mZMz&3R=q_)s#d=1j<_tnbfG(GNWeF4LaXh-!Yga=b$^ESCRjq)iN{UuV7uO%LW*WLzK)lANxeI50z9%D7;Lf#2k`dFu@!Y^ zx)KYfQUsqf#k?mc7tn9AWyvF<1YjXdBI#76-hQy?rZSSa88Od-%GZ-~hHi>?UGB~F zkXEe7?v3PQgG(xWDj^B$x;}(chEs%=?kIHWO?oDDlU*jOL~SfRz$hJ;m1wi(ziAIW z#6<1yGgbPG$~&0?#212iUJKMgz;jp5$oMp6hurME?h9Ttl-*p!KGEtPWRTaavaQ61 z!h27v&R7K%G^2u(ZR}-yXILr*cCSe8?ThDc@$o<1U??}7bLne=BC2Q^g$3e< z`9Q$?P>yjQ6Z#Sxk*}Vs7{kdZ5yhs7lE{8*R2_Hbm|gQT*UT<%d+Qc|Ts)u&Tp*VbX$g}kP6N->oFpl1u54R&e2b1d^5co#m*I^aKKYoG`= z_T4+7&TPbt{vxeWq7-@C;08R5uWe3isx@;;g{GAz%t1n6W{i-v9MfJa=sNeonxCBu zmLf7ObLHl}WNpWh_}*vWGU=3V8SK*%@9xx^?0j}eEpnEfSSq&MrB$%~+}sfODdPhb z0i$g&2N6!+C^5J&squ8(_NlybMnc@QSec7pk7GvGfv?JAcF`{Wc2Zl9UZ>2}1x9Y! zp&p{TelNBg(+^CD!rwbidmZiwT6Ivxuht=*N)jUwryX?jN5IRyLtL1B&#U~y zqhr~mBBl!p*z97&5xrIjLH4t$#F#1Lrn2H%dWfUDt0Fz(<&=S)cjo7JU?R1Hw$=WbqSbB|z zuZ67QS3`fljy=q`$4j$gYdH*LJodcTcygR@UwBG|WKW(?(D)$Wi;2z@TV2F+lwVxB zJGqu?bUxjzc@f4^F-Zn}Q4JZDEgcZ&S&*<~J~`jDwxm%8Sw$Pu>R3T-@mv zQ+|?G+0OkIPd^-t@h<$I^aJtP5g?_k=_8myJ$WU@2h<-TndS+3|C?<*3s1q=5YRQe z5OHZc)iF+KAuXORXoM*v%W7zQjJEKp%hpGQB4x~{by~=to*tWNFCnaH>y{^k=W}Ti zzgd(=jiOJIQ@oT1@PYDNYc5n+}8C zj3vumxwA35aFgs5&>cE(IKzE{2s`)_ zWTaZVQrAxnKwN{fd?E*uXg*sv_fsPDqAIE+FLo{s#C|k=krCiiI8j~dXoco~V7UA# zlQT0*GI@HI$+GpWXnYQQML?ffE!m;+aq%*pC*U-W`z3%aF&mxfrBDYem}c{=!9_=cY_=&3)~IEiMc0fR~m#oeBE^?)Wnj9PnE zy;xPZyqX&K3)H82qD?V~`WPT~M+xSiMtOW~2* z$FUqjx0lJqTeO(Jim{v+>;8;-D2EPrr`_9Dy(ZIQQf^OsB7T@Z<`p0qfvnB%{_2fQ z8!8J|GU-x(Wo~9hAKz&2ecs|2R!yLKp+>{;=11}SEMx7<8rTFTCwsxqN4aK+)P+#x z`-c^MUO)zR6CDsTf^VJmF!j6_(dWSTQyp74A>M*(e4b^Z)RvMP@+nWad`FTp9{gUA z!9Vh1VBcrKKP3%NZ8gU=ik`*p)wl}1NumQ#$}Rrz6`FVt7Zsp09Fj_J=Fm(UjgQPb zj^sM$h-j|UKyH&|s9<_}%@+1hBhPrEd`_Km4C7~XA^W1rS<`)(s-(B1mLxkaTCbD2 z5nHzss|q$_a}^btPZDvr7q{TiT@_j*g3;52W=A97mGME%f`|Q)0xx+LuN`oTX(8ESo_sKFi zdGn0{%EV^b51DXvq|h%nzS5*zMUVkOt6)B=Yu@I&XjN0DYHU7G|2BUAA<+NfHxlKC zJm_be#Rc^9!mUotjf2M0x{b-0gJu6RX4Z54x!t{QX^Xv5X2yP6B2;hatnJCpO!U0w zzD&)5#Q7>!bG7z56^Aunh-29eUduCP;jY zb>%iQ{__NbI!kp@T3JEZBE}QG}I$F)(W-JuqlBV-Sd}& zL2kaC-W6IfnIo}S6*KnDQ))a4QVA8cL6&Yy6Fn=+GNzJWP|z^6dx2J>s7#DA+c0p1 zzLc4cueNQIkQI4)%JRzaCTBz7F;v6S?Iew+-1*Bw=uT+ZW##*CG`=FXSD&MuzSpWu zjn{uy>3EZemSU+gIHYE-e9vRuF|9gB_U(RP32+XoN5qFYhBA*ov_TsZ=^9;aReV|l zabtf4MfTaK^_K^a|B&eZ$t7rH6;wCNmBCp{swrH+7B>@h=t(G2;-7HmPO-{VNNG#m z=xkAum$~zmfuQeP?pP;U<*|1LqGr_qEiZDPQ0YPg^q4n`yO>!+BG;sTpx=WJhoimmCYKC?#%??f$w zn4o)ump+CwP~!S-sIuY0O&sq#=EXSL+h$K#Y`;q#>9Hme9wzdqT!`zw{)1Y0{M1sW z7OfjiFxTk9A0?orzQeFO7W8bZ5=Iy)mYA&yjb=pYC{`4!e@XH&DvySr#>cU@hA zLwvJ8qm;9W)ojfvMs(nC&hjDAwuH?VRpR0prC~YNgBPZ@1dA}4uKWTwu4QqX#Jnj> zy3$7Z6UP|RKIjz`vnEN?GIUBObMWHLyio>Qw#{w9^`-W{*M1$>uZYc>oxaLRFiXzs z$Fy8kDkkbw<(N$1*fX_$FWr7C+53fltWf@9Usaw6v_=Ax3l*rj_xzp=;dPoS1#V}Ff;RqD3=^{UVXPjC-MxDB&P!xuXGbNFuqA_fLP@UJgnG06B90`JhWEe(?6ISwYk-lEaRbPkH(78+RNpA}YA z+!F&5pbk&Du%1Ry)(Rp^>Bk5To3SxU>2E6y6Me1YOW~ORikZH9C~NkDdg( z-%N1%^B|IE&Cx~(vmB=F$mNdD{))@=O&n27jIC7R=`U> z7=c_sW{@w^+at)<74er!>V>@H_b-hIZzOf_Kso=?nAS)ktPllm5Gx)@5(FTD{C$uS zh_^DW(lYJPa)T89xBmbLD}Iu~--927xN)?9L;bn*FC;7((>WR5j)(etrvJ|){y(7( zSy&l~en<5u;=iDd;Y`t-+=na*%>T_?_CJkl|A1ziQo_Ft01;$nM)^7m*?ZgLb0_q# zGztYEebKHcFJ@wKArT=F05jiz@j&}{2;npC