Lazy load delta: Support for large tables (#71)

* Allow for delta tables to be directly backed by storage.

Enables experimental support for delta tables that are too large to be
stored in memory. We directly expose `DeltaTable` instead of copying the
data into a datafusion::Memtable.

Disadvantages:
- in the new mode, no support for S3
- as we're relying on datafusion to handle the parquet files directly,
  nested schemas and certain data types may not work properly.
This commit is contained in:
Thomas Peiselt 2021-09-06 09:56:55 +02:00 committed by GitHub
parent ff2d06b0e4
commit ea84099b07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 16 deletions

8
Cargo.lock generated
View File

@ -765,7 +765,7 @@ dependencies = [
[[package]]
name = "deltalake"
version = "0.4.1"
source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec"
source = "git+https://github.com/delta-io/delta-rs.git?rev=2a0d3632f44a5ffa8cf6bf953615699547856719#2a0d3632f44a5ffa8cf6bf953615699547856719"
dependencies = [
"anyhow",
"arrow",
@ -774,6 +774,7 @@ dependencies = [
"cfg-if 1.0.0",
"chrono",
"clap",
"datafusion",
"env_logger",
"errno",
"futures",
@ -3323,3 +3324,8 @@ dependencies = [
"cc",
"libc",
]
[[patch.unused]]
name = "deltalake"
version = "0.4.1"
source = "git+https://github.com/delta-io/delta-rs.git?rev=61e2941cc5787ac2028efea271a54926f9c45cec#61e2941cc5787ac2028efea271a54926f9c45cec"

View File

@ -36,7 +36,9 @@ rusoto_s3 = { version = "0.46" }
rusoto_credential = { version = "0.46" }
rusoto_sts = { version = "0.46" }
deltalake = { version = "0", features = ["s3"] }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "2a0d3632f44a5ffa8cf6bf953615699547856719", features = ["s3", "datafusion-ext"] }
# deltalake = { version = "0", features = ["s3", "datafusion-ext"] }
[dev-dependencies]
anyhow = "1"

View File

@ -4,13 +4,54 @@ use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use datafusion::parquet::file::reader::SerializedFileReader;
use datafusion::parquet::file::serialized_reader::SliceableCursor;
use crate::error::ColumnQError;
use crate::io;
use crate::table::TableSource;
use crate::table::{TableLoadOption, TableOptionDelta, TableSource};
use deltalake;
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
.unwrap_or_else(|| TableLoadOption::delta(TableOptionDelta::default()));
let TableOptionDelta { use_memory_table } = opt.as_delta()?;
let uri_str = t.get_uri_str();
let delta_table = deltalake::open_table(uri_str).await?;
let parsed_uri = t.parsed_uri()?;
let blob_type = io::BlobStoreType::try_from(parsed_uri.scheme())?;
if *use_memory_table {
to_mem_table(delta_table, blob_type).await
} else {
to_delta_table(delta_table, blob_type).await
}
}
pub async fn to_delta_table(
delta_table: deltalake::DeltaTable,
blob_type: io::BlobStoreType,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
match blob_type {
io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)),
io::BlobStoreType::S3 => Err(ColumnQError::LoadDelta(format!(
"S3 for delta table currently only supported in conjunction with `to_memory_table` config: {}",
delta_table.table_uri,
))),
_ => {
return Err(ColumnQError::InvalidUri(format!(
"Scheme in table uri not supported for delta table: {}",
delta_table.table_uri,
)));
}
}
}
fn read_partition<R: Read>(mut r: R, batch_size: usize) -> Result<Vec<RecordBatch>, ColumnQError> {
let mut buffer = Vec::new();
@ -32,21 +73,17 @@ fn read_partition<R: Read>(mut r: R, batch_size: usize) -> Result<Vec<RecordBatc
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
delta_table: deltalake::DeltaTable,
blob_type: io::BlobStoreType,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let uri_str = t.get_uri_str();
let delta_table = deltalake::open_table(uri_str).await?;
if delta_table.get_files().is_empty() {
return Err(ColumnQError::LoadDelta("empty delta table".to_string()));
}
let delta_schema = delta_table.get_schema()?;
let uri = t.parsed_uri()?;
let blob_type = io::BlobStoreType::try_from(uri.scheme())?;
let paths = delta_table.get_file_uris();
let path_iter = paths.iter().map(|s| s.as_str());
@ -70,13 +107,73 @@ pub async fn to_mem_table(
_ => {
return Err(ColumnQError::InvalidUri(format!(
"Scheme in table uri not supported for delta table: {}",
uri_str,
delta_table.table_uri,
)));
}
};
Ok(datafusion::datasource::MemTable::try_new(
Ok(Arc::new(datafusion::datasource::MemTable::try_new(
Arc::new(delta_schema.try_into()?),
partitions,
)?)
)?))
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::datasource::datasource::Statistics;
use datafusion::datasource::MemTable;
use deltalake::DeltaTable;
use crate::error::ColumnQError;
use crate::test_util::test_data_path;
#[tokio::test]
async fn load_delta_as_memtable() -> Result<(), ColumnQError> {
let t = to_datafusion_table(
&TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option(
TableLoadOption::delta(TableOptionDelta {
use_memory_table: true,
}),
),
)
.await?;
validate_statistics(t.statistics());
match t.as_any().downcast_ref::<MemTable>() {
Some(_) => Ok(()),
None => panic!("must be of type datafusion::datasource::MemTable"),
}
}
#[tokio::test]
async fn load_delta_as_delta_source() -> Result<(), ColumnQError> {
let t = to_datafusion_table(
&TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option(
TableLoadOption::delta(TableOptionDelta {
use_memory_table: false,
}),
),
)
.await?;
match t.as_any().downcast_ref::<DeltaTable>() {
Some(delta_table) => {
assert_eq!(delta_table.version, 0);
Ok(())
}
None => panic!("must be of type deltalake::DeltaTable"),
}
}
fn validate_statistics(stats: Statistics) {
assert_eq!(stats.num_rows, Some(500));
let column_stats = stats.column_statistics.unwrap();
assert_eq!(column_stats[0].null_count, Some(245));
assert_eq!(column_stats[1].null_count, Some(373));
assert_eq!(column_stats[2].null_count, Some(237));
}
}

View File

@ -145,6 +145,27 @@ impl Default for TableOptionParquet {
}
}
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionDelta {
#[serde(default = "TableOptionDelta::default_use_memory_table")]
use_memory_table: bool,
}
impl TableOptionDelta {
#[inline]
pub fn default_use_memory_table() -> bool {
true
}
}
impl Default for TableOptionDelta {
fn default() -> Self {
Self {
use_memory_table: Self::default_use_memory_table(),
}
}
}
// Adding new table format:
// * update TableLoadOption enum to add the new variant
// * update TableLoadOption.extension
@ -165,7 +186,7 @@ pub enum TableLoadOption {
ndjson {},
parquet(TableOptionParquet),
google_spreadsheet(TableOptionGoogleSpreasheet),
delta {},
delta(TableOptionDelta),
arrow {},
arrows {},
}
@ -194,6 +215,13 @@ impl TableLoadOption {
}
}
fn as_delta(&self) -> Result<&TableOptionDelta, ColumnQError> {
match self {
Self::delta(opt) => Ok(opt),
_ => Err(ColumnQError::ExpectFormatOption("delta".to_string())),
}
}
pub fn extension<'a>(&'a self) -> &'static str {
match self {
Self::json { .. } => "json",
@ -352,7 +380,7 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
}
TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?),
TableLoadOption::delta { .. } => delta::to_datafusion_table(t).await?,
TableLoadOption::arrow { .. } => Arc::new(arrow_ipc_file::to_mem_table(t).await?),
TableLoadOption::arrows { .. } => Arc::new(arrow_ipc_stream::to_mem_table(t).await?),
})

View File

@ -114,7 +114,7 @@ mod tests {
match t.as_any().downcast_ref::<ParquetTable>() {
Some(_) => Ok(()),
None => panic!("not read a datafusion::ParquetTable"),
None => panic!("must be of type datafusion::datasource::parquet::ParquetTable"),
}
}

View File

@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1630864087805,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"4750","numOutputRows":"500"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"373e4420-3d29-4f0b-9ec0-0427dd6f9c68","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"reply_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"next_id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"blog_id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1630864087072}}
{"add":{"path":"part-00000-91e75e45-f604-46ac-aa68-b3542389889a-c000.snappy.parquet","partitionValues":{},"size":4750,"modificationTime":1630864087000,"dataChange":true}}