make batch_size configurable (#107)

This commit is contained in:
Toby Hede 2021-11-19 11:54:37 +11:00 committed by GitHub
parent 65702fd350
commit 9e3769bb8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 12 deletions

View File

@ -21,7 +21,7 @@ pub async fn to_mem_table(
let delimiter = opt.delimiter;
let projection = opt.projection.as_ref();
let batch_size = 1024;
let batch_size = t.batch_size;
debug!("inferring csv table schema...");
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {

View File

@ -26,9 +26,10 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvide
let delta_table = deltalake::open_table(uri_str).await?;
let parsed_uri = t.parsed_uri()?;
let blob_type = io::BlobStoreType::try_from(parsed_uri.scheme())?;
let batch_size = t.batch_size;
if *use_memory_table {
to_mem_table(delta_table, blob_type).await
to_mem_table(delta_table, blob_type, batch_size).await
} else {
to_delta_table(delta_table, blob_type).await
}
@ -75,10 +76,8 @@ fn read_partition<R: Read>(mut r: R, batch_size: usize) -> Result<Vec<RecordBatc
pub async fn to_mem_table(
delta_table: deltalake::DeltaTable,
blob_type: io::BlobStoreType,
batch_size: usize,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
if delta_table.get_files().is_empty() {
return Err(ColumnQError::LoadDelta("empty delta table".to_string()));
}

View File

@ -104,8 +104,7 @@ fn json_vec_to_partition(
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let batch_size = t.batch_size;
let array_encoded = match &t.option {
Some(TableLoadOption::json { array_encoded, .. }) => array_encoded.unwrap_or(false),
_ => false,

View File

@ -277,6 +277,8 @@ pub struct TableSource {
pub io_source: TableIoSource,
pub schema: Option<TableSchema>,
pub option: Option<TableLoadOption>,
#[serde(default = "TableSource::default_batch_size")]
pub batch_size: usize
}
impl TableSource {
@ -285,7 +287,8 @@ impl TableSource {
name: name.into(),
io_source: source.into(),
schema: None,
option: None,
option: None,
batch_size: Self::default_batch_size(),
}
}
@ -293,6 +296,11 @@ impl TableSource {
Self::new(name, uri.into())
}
#[inline]
pub fn default_batch_size() -> usize {
1024
}
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
self.option = Some(option.into());
self
@ -502,6 +510,21 @@ schema:
Ok(())
}
#[test]
fn batch_size_deserialisation() -> anyhow::Result<()> {
let table_source: TableSource = serde_yaml::from_str(
r#"
name: "ubuntu_ami"
uri: "test_data/ubuntu-ami.json"
batch_size: 512
"#,
)?;
assert_eq!(table_source.batch_size, 512);
Ok(())
}
#[test]
fn test_parse_table_uri() {
let t = parse_table_uri_arg("t=a/b/c").unwrap();

View File

@ -11,8 +11,7 @@ use crate::table::TableSource;
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let batch_size = t.batch_size;
let schema_ref: SchemaRef = match &t.schema {
Some(table_schema) => Arc::new(table_schema.into()),

View File

@ -32,8 +32,7 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvide
}
pub async fn to_mem_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let batch_size = t.batch_size;
let mut schema: Option<Schema> = None;