diff --git a/Cargo.lock b/Cargo.lock index 95ab0bc..66658a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,15 +851,17 @@ dependencies = [ [[package]] name = "calamine" -version = "0.19.1" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6381d1037ee9b8a6c8eb97936add0331a1aabd148d5b6f35f1cda6e5dec44f40" +checksum = "47a4d6ea525ea187df1e3a1c4b23469b1cbe60c5bafc1c0ef14b2b8738a8303d" dependencies = [ "byteorder", + "chrono", "codepage", "encoding_rs", "log", - "quick-xml 0.25.0", + "once_cell", + "quick-xml 0.31.0", "serde", "zip", ] @@ -3496,16 +3498,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "quick-xml" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58e21a144a0ffb5fad7b464babcdab934a325ad69b7c0373bcfef5cbd9799ca9" -dependencies = [ - "encoding_rs", - "memchr", -] - [[package]] name = "quick-xml" version = "0.28.2" @@ -3516,6 +3508,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "encoding_rs", + "memchr", +] + [[package]] name = "quote" version = "1.0.33" diff --git a/README.md b/README.md index 2b53f71..8bf2016 100644 --- a/README.md +++ b/README.md @@ -318,7 +318,7 @@ Data layer: - [x] JSON - [x] NDJSON - [x] parquet - - [x] xls, xlsx, xlsm, ods: https://github.com/tafia/calamine + - [x] xls, xlsx, xlsb, ods: https://github.com/tafia/calamine - [x] [DeltaLake](https://delta.io/) Misc: diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 4d979b7..f2216b5 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -39,7 +39,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "blocking", "json", ] } -calamine = "0.19.1" +calamine = {version = "0.23.1", features = ["dates"]} tokio = { version = "1", features = ["rt-multi-thread"] } futures = "0.3" diff --git a/columnq/src/error.rs b/columnq/src/error.rs index 5aeae09..e290ea1 100644 --- a/columnq/src/error.rs +++ b/columnq/src/error.rs @@ -39,8 +39,8 @@ pub enum ColumnQError { #[error("Error loading Delta table: {0}")] LoadDelta(String), - #[error("Error loading Xlsx table: {0}")] - LoadXlsx(String), + #[error("Error loading Excel table: {0}")] + LoadExcel(String), #[error("Error loading data from HTTP store: {0}")] HttpStore(String), diff --git a/columnq/src/table/excel.rs b/columnq/src/table/excel.rs new file mode 100644 index 0000000..1b35fee --- /dev/null +++ b/columnq/src/table/excel.rs @@ -0,0 +1,771 @@ +use crate::table::{self, TableOptionExcel, TableSchema, TableSource}; +use arrow_schema::TimeUnit; +use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets}; +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, DurationSecondArray, NullArray, PrimitiveArray, StringArray, + TimestampSecondArray, +}; +use datafusion::arrow::datatypes::{ + DataType, Date32Type, Date64Type, Field, Float64Type, Int64Type, Schema, +}; +use datafusion::arrow::record_batch::RecordBatch; +use snafu::prelude::*; +use std::collections::HashMap; +use std::sync::Arc; +use std::vec; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to load Excel: {msg}"))] + Load { msg: String }, + #[snafu(display("Incorrect schema: {msg}"))] + IncorrectSchema { msg: String }, + #[snafu(display("Excel schema inference error"))] + SchemaInference, + #[snafu(display("Failed to create record batch: {source}"))] + CreateRecordBatch { + source: datafusion::arrow::error::ArrowError, + }, + #[snafu(display("Failed to open workbook: {source}"))] + OpenWorkbook { source: calamine::Error }, +} + +struct ExcelSubrange<'a> { + rows: calamine::Rows<'a, ExcelDataType>, + columns_range_start: usize, + columns_range_end: usize, + total_rows: usize, + current_row_id: usize, +} + +impl<'a> ExcelSubrange<'a> { + fn new( + range: &'a Range, + rows_range_start: Option, + rows_range_end: Option, + columns_range_start: Option, + columns_range_end: Option, + ) -> ExcelSubrange { + let rows_range_start = rows_range_start.unwrap_or(usize::MIN); + let rows_range_end = rows_range_end + .or(range.end().map(|v| v.0 as usize)) + .unwrap(); + + let mut rows = range.rows(); + if rows_range_start > 0 { + // rows skipping + rows.nth(rows_range_start - 1); + } + + ExcelSubrange { + rows, + columns_range_start: columns_range_start.unwrap_or(usize::MIN), + columns_range_end: columns_range_end.unwrap_or(usize::MAX), + total_rows: rows_range_end - rows_range_start + 1, + current_row_id: 0, + } + } + + fn size(&self) -> usize { + self.total_rows + } +} + +impl<'a> Iterator for ExcelSubrange<'a> { + type Item = &'a [ExcelDataType]; + + fn next(&mut self) -> Option { + if self.current_row_id < self.total_rows { + self.current_row_id += 1; + self.rows + .next() + .map(|x| &x[self.columns_range_start..=self.columns_range_end.min(x.len() - 1)]) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, Some(self.total_rows)) + } +} + +fn infer_value_type(v: &ExcelDataType) -> Result { + match v { + ExcelDataType::Int(_) => Ok(DataType::Int64), + ExcelDataType::Float(_) => Ok(DataType::Float64), + ExcelDataType::String(_) => Ok(DataType::Utf8), + ExcelDataType::Bool(_) => Ok(DataType::Boolean), + ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => { + Ok(DataType::Timestamp(TimeUnit::Second, None)) + } + ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => { + Ok(DataType::Duration(TimeUnit::Second)) + } + ExcelDataType::Error(e) => Err(Error::Load { msg: e.to_string() }), + ExcelDataType::Empty => Ok(DataType::Null), + } +} + +fn infer_schema_from_data(mut range: ExcelSubrange) -> Result { + let mut col_types: HashMap<&str, DataType> = HashMap::new(); + let col_names: Vec<&str> = range + .next() + .ok_or(Error::Load { + msg: String::from("Failed to infer schema for empty excel table"), + })? + .iter() + .enumerate() + .map(|(i, c)| { + c.get_string().ok_or_else(|| Error::Load { + msg: format!("The {i}th column name is empty"), + }) + }) + .collect::, _>>()?; + + for row in range { + for (i, col_val) in row.iter().enumerate() { + let col_name = col_names.get(i).ok_or(Error::Load { + msg: String::from( + "Failed to infer schema. Number of values in row is more then column names.", + ), + })?; + let col_type = infer_value_type(col_val)?; + col_types + .entry(col_name) + .and_modify(|ct| { + if !ct.equals_datatype(&col_type) && ct.equals_datatype(&DataType::Null) { + *ct = col_type.clone(); + } + // if column values has more than one not null type then we upcast column type to the most general datatype Utf8. + else if !ct.equals_datatype(&col_type) + && !&col_type.equals_datatype(&DataType::Null) + { + *ct = DataType::Utf8; + } + }) + .or_insert(col_type); + } + } + + let fields: Vec = col_names + .iter() + .map(|col_name| { + let dt = col_types.get(col_name).unwrap_or(&DataType::Utf8).clone(); + Field::new(col_name.replace(' ', "_"), dt, true) + }) + .collect(); + Ok(Schema::new(fields)) +} + +fn infer_schema_from_config(table_schema: &TableSchema) -> Result { + let unsupported_data_types = table_schema + .columns + .iter() + .filter(|c| { + !matches!( + c.data_type, + DataType::Boolean + | DataType::Int64 + | DataType::Float64 + | DataType::Duration(TimeUnit::Second) + | DataType::Date32 + | DataType::Date64 + | DataType::Null + | DataType::Utf8 + | DataType::Timestamp(TimeUnit::Second, None) + ) + }) + .map(|c| c.name.clone()) + .collect::>() + .join(", "); + + if unsupported_data_types.is_empty() { + Ok(table_schema.into()) + } else { + Err(Error::IncorrectSchema{msg: format!("Configured schema for excel file contains unsupported data types in columns {}. Supported datatype: \ + Boolean, Int64, Float64, Date32, Date64, !Timestamp [Second, null], !Duration [Second], Null, Utf8", unsupported_data_types)}) + } +} + +fn empty_or_panic(v: &ExcelDataType, field_name: &String) -> Option { + if v.is_empty() { + None + } else { + panic!("Incorrect value {:?} in column {}", v, field_name) + } +} + +fn infer_schema( + r: &Range, + option: &TableOptionExcel, + schema: &Option, +) -> Result { + let TableOptionExcel { + rows_range_start, + rows_range_end, + columns_range_start, + columns_range_end, + schema_inference_lines, + .. + } = *option; + + if let Some(schema) = schema { + infer_schema_from_config(schema) + } else { + let last_row_for_schema_inference = schema_inference_lines + .map(|r| r + rows_range_start.unwrap_or(0)) + .or(rows_range_end); + + let range = ExcelSubrange::new( + r, + rows_range_start, + last_row_for_schema_inference, + columns_range_start, + columns_range_end, + ); + infer_schema_from_data(range) + } +} + +fn excel_range_to_record_batch( + r: Range, + option: &TableOptionExcel, + schema: Schema, +) -> Result { + let TableOptionExcel { + rows_range_start, + rows_range_end, + columns_range_start, + columns_range_end, + .. + } = *option; + + let arrays = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| { + let rows = ExcelSubrange::new( + &r, + rows_range_start.map(|x| x + 1).or(Some(1)), // skip first row because it is header + rows_range_end, + columns_range_start, + columns_range_end, + ); + let field_name = field.name(); + + match field.data_type() { + DataType::Boolean => Arc::new( + rows.map(|r| { + r.get(i) + .and_then(|v| v.get_bool().or_else(|| empty_or_panic(v, field_name))) + }) + .collect::(), + ) as ArrayRef, + DataType::Int64 => Arc::new( + rows.map(|r| { + r.get(i) + .and_then(|v| v.get_int().or_else(|| empty_or_panic(v, field_name))) + }) + .collect::>(), + ) as ArrayRef, + DataType::Float64 => Arc::new( + rows.map(|r| { + r.get(i) + .and_then(|v| v.get_float().or_else(|| empty_or_panic(v, field_name))) + }) + .collect::>(), + ) as ArrayRef, + DataType::Duration(TimeUnit::Second) => Arc::new( + rows.map(|r| { + r.get(i).and_then(|v| { + v.as_duration() + .map(|v| v.num_seconds()) + .or_else(|| empty_or_panic(v, field_name)) + }) + }) + .collect::(), + ) as ArrayRef, + DataType::Null => Arc::new(NullArray::new(rows.size())) as ArrayRef, + DataType::Utf8 => Arc::new( + rows.map(|r| { + r.get(i).and_then(|v| match v { + ExcelDataType::Bool(x) => Some(x.to_string()), + ExcelDataType::Float(_) + | ExcelDataType::Int(_) + | ExcelDataType::String(_) => v.as_string(), + ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => { + v.as_datetime().map(|x| x.to_string()) + } + ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => { + v.as_duration().map(|x| x.to_string()) + } + ExcelDataType::Empty => None, + ExcelDataType::Error(e) => Some(e.to_string()), + }) + }) + .collect::(), + ) as ArrayRef, + DataType::Timestamp(TimeUnit::Second, None) => Arc::new( + rows.map(|r| { + r.get(i).and_then(|v| { + v.as_datetime() + .map(|v| v.and_utc().timestamp()) + .or_else(|| empty_or_panic(v, field_name)) + }) + }) + .collect::(), + ) as ArrayRef, + DataType::Date64 => Arc::new( + rows.map(|r| { + r.get(i).and_then(|v| { + v.as_datetime() + .map(|v| v.timestamp_millis()) + .or_else(|| empty_or_panic(v, field_name)) + }) + }) + .collect::>(), + ) as ArrayRef, + DataType::Date32 => Arc::new( + rows.map(|r| { + r.get(i).and_then(|v| { + v.as_datetime() + .map(|v| (v.timestamp() / 86400) as i32) + .or_else(|| empty_or_panic(v, field_name)) + }) + }) + .collect::>(), + ) as ArrayRef, + unsupported => panic!("Unsupported data type for excel table {:?}", unsupported), + } + }) + .collect::>(); + + RecordBatch::try_new(Arc::new(schema), arrays).context(CreateRecordBatchSnafu) +} + +pub async fn to_mem_table( + t: &TableSource, +) -> Result { + let opt = t + .option + .as_ref() + .ok_or(table::Error::MissingOption {})? + .as_excel()?; + let uri = t.get_uri_str(); + let mut workbook: Sheets<_> = open_workbook_auto(uri) + .context(OpenWorkbookSnafu) + .context(table::LoadExcelSnafu)?; + + let worksheet_range = match &opt.sheet_name { + Some(sheet) => Some(workbook.worksheet_range(sheet)), + None => workbook.worksheet_range_at(0), + }; + + if let Some(Ok(range)) = worksheet_range { + let shema = infer_schema(&range, opt, &t.schema).context(table::LoadExcelSnafu)?; + let batch = + excel_range_to_record_batch(range, opt, shema).context(table::LoadExcelSnafu)?; + let schema_ref = batch.schema(); + let partitions = vec![vec![batch]]; + + datafusion::datasource::MemTable::try_new(schema_ref, partitions) + .context(table::CreateMemTableSnafu) + } else { + Err(Error::Load { + msg: "Failed to open excel file.".to_owned(), + }) + .context(table::LoadExcelSnafu) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{BooleanArray, Float64Array, Int64Array, StringArray}; + use crate::table::{TableColumn, TableIoSource}; + use crate::test_util::*; + use datafusion::datasource::TableProvider; + use datafusion::prelude::SessionContext; + + use calamine::{Cell, DataType as ExcelDataType}; + + #[test] + fn excel_subrange_iteration() { + let range = calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::Int(0)), + Cell::new((0, 1), ExcelDataType::Bool(true)), + Cell::new((0, 2), ExcelDataType::Float(0.333)), + Cell::new((1, 0), ExcelDataType::Int(1)), + Cell::new((1, 1), ExcelDataType::Bool(false)), + Cell::new((1, 2), ExcelDataType::Float(1.333)), + Cell::new((2, 0), ExcelDataType::Int(2)), + Cell::new((2, 1), ExcelDataType::Empty), + Cell::new((2, 2), ExcelDataType::Float(2.333)), + Cell::new((3, 0), ExcelDataType::Int(3)), + Cell::new((3, 1), ExcelDataType::Bool(true)), + Cell::new((3, 2), ExcelDataType::Float(3.333)), + ]); + let mut subrange = ExcelSubrange::new(&range, None, None, None, None); + assert_eq!(subrange.size(), 4); + assert_eq!( + subrange.next(), + Some( + &vec![ + ExcelDataType::Int(0), + ExcelDataType::Bool(true), + ExcelDataType::Float(0.333) + ][..] + ) + ); + assert_eq!( + subrange.next(), + Some( + &vec![ + ExcelDataType::Int(1), + ExcelDataType::Bool(false), + ExcelDataType::Float(1.333) + ][..] + ) + ); + assert_eq!( + subrange.next(), + Some( + &vec![ + ExcelDataType::Int(2), + ExcelDataType::Empty, + ExcelDataType::Float(2.333) + ][..] + ) + ); + assert_eq!( + subrange.next(), + Some( + &vec![ + ExcelDataType::Int(3), + ExcelDataType::Bool(true), + ExcelDataType::Float(3.333) + ][..] + ) + ); + assert_eq!(subrange.next(), None); + + let mut subrange = ExcelSubrange::new(&range, Some(1), Some(2), Some(1), Some(1)); + assert_eq!(subrange.size(), 2); + assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Bool(false)][..])); + assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Empty][..])); + assert_eq!(subrange.next(), None); + } + + #[test] + fn inferes_schema_from_data() { + let range = calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))), + Cell::new((0, 1), ExcelDataType::String(String::from("bool_column"))), + Cell::new((0, 2), ExcelDataType::String(String::from("float column"))), + Cell::new((0, 3), ExcelDataType::String(String::from("string_column"))), + Cell::new( + (0, 4), + ExcelDataType::String(String::from("datetime_column")), + ), + Cell::new( + (0, 5), + ExcelDataType::String(String::from("datetime iso column")), + ), + Cell::new( + (0, 6), + ExcelDataType::String(String::from("duration column")), + ), + Cell::new( + (0, 7), + ExcelDataType::String(String::from("duration iso column")), + ), + Cell::new((1, 0), ExcelDataType::Int(0)), + Cell::new((1, 1), ExcelDataType::Bool(true)), + Cell::new((1, 2), ExcelDataType::Float(0.333)), + Cell::new((1, 3), ExcelDataType::String(String::from("test"))), + Cell::new((1, 4), ExcelDataType::DateTime(44986.12)), + Cell::new((1, 5), ExcelDataType::DateTimeIso(String::from("test"))), + Cell::new((1, 6), ExcelDataType::Duration(44986.12)), + Cell::new((1, 7), ExcelDataType::DurationIso(String::from("test"))), + Cell::new((2, 0), ExcelDataType::Empty), + Cell::new((2, 0), ExcelDataType::Empty), + Cell::new((2, 1), ExcelDataType::Empty), + Cell::new((2, 2), ExcelDataType::Empty), + Cell::new((2, 3), ExcelDataType::Empty), + Cell::new((2, 4), ExcelDataType::Empty), + Cell::new((2, 5), ExcelDataType::Empty), + Cell::new((2, 6), ExcelDataType::Empty), + Cell::new((2, 7), ExcelDataType::Empty), + ]); + + let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap(); + + assert_eq!( + schema, + Schema::new(vec![ + Field::new("int_column", DataType::Int64, true), + Field::new("bool_column", DataType::Boolean, true), + Field::new("float_column", DataType::Float64, true), + Field::new("string_column", DataType::Utf8, true), + Field::new( + "datetime_column", + DataType::Timestamp(TimeUnit::Second, None), + true + ), + Field::new( + "datetime_iso_column", + DataType::Timestamp(TimeUnit::Second, None), + true + ), + Field::new( + "duration_column", + DataType::Duration(TimeUnit::Second), + true + ), + Field::new( + "duration_iso_column", + DataType::Duration(TimeUnit::Second), + true + ), + ]) + ); + + let range = calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::String(String::from("test_column"))), + Cell::new((1, 0), ExcelDataType::Int(0)), + Cell::new((2, 0), ExcelDataType::Empty), + Cell::new((2, 0), ExcelDataType::Float(0.5)), + ]); + + let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap(); + + assert_eq!( + schema, + Schema::new(vec![Field::new("test_column", DataType::Utf8, true)]) + ); + + let range = calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))), + Cell::new((0, 1), ExcelDataType::Empty), + Cell::new((0, 2), ExcelDataType::String(String::from("float column"))), + ]); + + assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err()); + + let range = calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::String(String::from("column1"))), + Cell::new((0, 1), ExcelDataType::String(String::from("column2"))), + Cell::new((1, 0), ExcelDataType::Int(1)), + Cell::new((1, 1), ExcelDataType::Int(1)), + Cell::new((1, 3), ExcelDataType::Int(1)), + ]); + assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err()); + } + + #[test] + fn inferes_schema_from_config() { + let range = calamine::Range::::from_sparse(vec![]); + let table_schema = TableSchema { + columns: vec![ + TableColumn { + name: String::from("float_column"), + data_type: DataType::Float64, + nullable: true, + }, + TableColumn { + name: String::from("integer_column"), + data_type: DataType::Int64, + nullable: true, + }, + ], + }; + let schema = + infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).unwrap(); + + assert_eq!( + schema.all_fields(), + vec![ + &Field::new("float_column", DataType::Float64, true), + &Field::new("integer_column", DataType::Int64, true), + ] + ); + + let table_schema = TableSchema { + columns: vec![ + TableColumn { + name: String::from("float_column"), + data_type: DataType::Float16, + nullable: true, + }, + TableColumn { + name: String::from("integer_column"), + data_type: DataType::Int16, + nullable: true, + }, + ], + }; + assert!(infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).is_err()); + } + + #[tokio::test] + async fn load_xlsx_with_toml_config() { + let mut table_source: TableSource = toml::from_str( + r#" +name = "test" +uri = "test_data/uk_cities_with_headers.xlsx" +[option] +format = "xlsx" +sheet_name = "uk_cities_with_headers" +"#, + ) + .unwrap(); + // patch uri path with the correct test data path + table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx")); + + let t = to_mem_table(&table_source).await.unwrap(); + let ctx = SessionContext::new(); + let stats = t + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() + .statistics(); + assert_eq!(stats.num_rows, Some(37)); + } + + #[tokio::test] + async fn load_xlsx_with_yaml_config() { + let mut table_source: TableSource = serde_yaml::from_str( + r#" +name: "test" +uri: "test_data/uk_cities_with_headers.xlsx" +option: + format: "xlsx" + sheet_name: "uk_cities_with_headers" +"#, + ) + .unwrap(); + // patch uri path with the correct test data path + table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx")); + + let t = to_mem_table(&table_source).await.unwrap(); + let ctx = SessionContext::new(); + let stats = t + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() + .statistics(); + assert_eq!(stats.num_rows, Some(37)); + } + + #[tokio::test] + async fn load_ods_with_custom_range_and_without_sheet_name() { + let mut table_source: TableSource = serde_yaml::from_str( + r#" +name: "test" +uri: "test_data/excel_range.ods" +option: + format: "ods" + rows_range_start: 2 + rows_range_end: 5 + columns_range_start: 1 + columns_range_end: 6 + schema_inference_lines: 3 +"#, + ) + .unwrap(); + // patch uri path with the correct test data path + table_source.io_source = TableIoSource::Uri(test_data_path("excel_range.ods")); + + let t = to_mem_table(&table_source).await.unwrap(); + let ctx = SessionContext::new(); + let stats = t + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() + .statistics(); + assert_eq!(stats.column_statistics.unwrap().len(), 6); + assert_eq!(stats.num_rows, Some(3)); + } + + #[test] + fn transforms_excel_range_to_record_batch() { + let range: calamine::Range = + calamine::Range::::from_sparse(vec![ + Cell::new((0, 0), ExcelDataType::String("float_column".to_string())), + Cell::new((1, 0), ExcelDataType::Float(1.333)), + Cell::new((2, 0), ExcelDataType::Empty), + Cell::new((3, 0), ExcelDataType::Float(3.333)), + Cell::new((0, 1), ExcelDataType::String("integer_column".to_string())), + Cell::new((1, 1), ExcelDataType::Int(1)), + Cell::new((2, 1), ExcelDataType::Int(3)), + Cell::new((3, 1), ExcelDataType::Empty), + Cell::new((0, 2), ExcelDataType::String("boolean_column".to_string())), + Cell::new((1, 2), ExcelDataType::Empty), + Cell::new((2, 2), ExcelDataType::Bool(true)), + Cell::new((3, 2), ExcelDataType::Bool(false)), + Cell::new((0, 3), ExcelDataType::String("string_column".to_string())), + Cell::new((1, 3), ExcelDataType::String("foo".to_string())), + Cell::new((2, 3), ExcelDataType::String("bar".to_string())), + Cell::new((3, 3), ExcelDataType::String("baz".to_string())), + Cell::new((0, 4), ExcelDataType::String("mixed_column".to_string())), + Cell::new((1, 4), ExcelDataType::Float(1.1)), + Cell::new((2, 4), ExcelDataType::Int(1)), + Cell::new((3, 4), ExcelDataType::Empty), + Cell::new((0, 5), ExcelDataType::String("datetime_column".to_string())), + Cell::new((1, 5), ExcelDataType::DateTime(44986.12)), // 2023-03-01T02:52:48 + Cell::new((2, 5), ExcelDataType::Empty), + Cell::new((3, 5), ExcelDataType::DateTime(44900.12)), // 2022-12-05T02:52:48 + ]); + + let shema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap(); + let rb = excel_range_to_record_batch(range, &TableOptionExcel::default(), shema).unwrap(); + + assert_eq!( + rb.schema().all_fields(), + vec![ + &Field::new("float_column", DataType::Float64, true), + &Field::new("integer_column", DataType::Int64, true), + &Field::new("boolean_column", DataType::Boolean, true), + &Field::new("string_column", DataType::Utf8, true), + &Field::new("mixed_column", DataType::Utf8, true), + &Field::new( + "datetime_column", + DataType::Timestamp(TimeUnit::Second, None), + true + ), + ] + ); + + assert_eq!( + rb.column(0).as_ref(), + Arc::new(Float64Array::from(vec![Some(1.333), None, Some(3.333)])).as_ref(), + ); + assert_eq!( + rb.column(1).as_ref(), + Arc::new(Int64Array::from(vec![Some(1), Some(3), None])).as_ref(), + ); + assert_eq!( + rb.column(2).as_ref(), + Arc::new(BooleanArray::from(vec![None, Some(true), Some(false)])).as_ref(), + ); + assert_eq!( + rb.column(3).as_ref(), + Arc::new(StringArray::from(vec!["foo", "bar", "baz"])).as_ref(), + ); + assert_eq!( + rb.column(4).as_ref(), + Arc::new(StringArray::from(vec![Some("1.1"), Some("1"), None])).as_ref(), + ); + assert_eq!( + rb.column(5).as_ref(), + Arc::new(TimestampSecondArray::from(vec![ + Some(1677639168), // Unix timestamp for 2023-03-01T02:52:48 UTC + None, + Some(1670208768) // Unix timestamp for 2022-12-05T02:52:48 UTC + ])) + .as_ref(), + ); + } +} diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index efd1bf3..e187f47 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -21,11 +21,11 @@ pub mod arrow_ipc_stream; pub mod csv; pub mod database; pub mod delta; +pub mod excel; pub mod google_spreadsheets; pub mod json; pub mod ndjson; pub mod parquet; -pub mod xlsx; #[derive(Debug, Snafu)] pub enum Error { @@ -45,8 +45,8 @@ pub enum Error { LoadArrowIpcFile { source: arrow_ipc_file::Error }, #[snafu(display("Failed to load Google Sheet data: {source}"))] LoadGoogleSheet { source: google_spreadsheets::Error }, - #[snafu(display("Failed to load XLSX data: {source}"))] - LoadXlsx { source: xlsx::Error }, + #[snafu(display("Failed to load Excel data: {source}"))] + LoadExcel { source: excel::Error }, #[snafu(display("Failed to load database data: {source}"))] LoadDatabase { source: database::Error }, #[snafu(display("Failed to cast IO source to memory bytes for source: {table_source}"))] @@ -251,9 +251,14 @@ impl Default for TableOptionParquet { } } -#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] -pub struct TableOptionXlsx { +#[derive(Deserialize, Default, Debug, Clone, Eq, PartialEq)] +pub struct TableOptionExcel { pub sheet_name: Option, + pub rows_range_start: Option, + pub rows_range_end: Option, + pub columns_range_start: Option, + pub columns_range_end: Option, + pub schema_inference_lines: Option, } #[derive(Deserialize, Debug, Clone, Eq, PartialEq)] @@ -298,7 +303,10 @@ pub enum TableLoadOption { jsonl {}, parquet(TableOptionParquet), google_spreadsheet(TableOptionGoogleSpreadsheet), - xlsx(TableOptionXlsx), + xls(TableOptionExcel), + xlsx(TableOptionExcel), + xlsb(TableOptionExcel), + ods(TableOptionExcel), delta(TableOptionDelta), arrow {}, arrows {}, @@ -317,10 +325,10 @@ impl TableLoadOption { } } - pub fn as_xlsx(&self) -> Result<&TableOptionXlsx, Error> { + pub fn as_excel(&self) -> Result<&TableOptionExcel, Error> { match self { - Self::xlsx(opt) => Ok(opt), - _ => Err(Error::ExpectFormatOption { fmt: "xlsx" }), + Self::xls(opt) | Self::xlsx(opt) | Self::xlsb(opt) | Self::ods(opt) => Ok(opt), + _ => Err(Error::ExpectFormatOption { fmt: "excel" }), } } @@ -353,7 +361,10 @@ impl TableLoadOption { Self::csv { .. } => "csv", Self::parquet { .. } => "parquet", Self::google_spreadsheet(_) | Self::delta { .. } => "", + Self::xls { .. } => "xls", Self::xlsx { .. } => "xlsx", + Self::ods { .. } => "ods", + Self::xlsb { .. } => "xlsb", Self::arrow { .. } => "arrow", Self::arrows { .. } => "arrows", Self::mysql { .. } => "mysql", @@ -537,7 +548,7 @@ impl TableSource { match Path::new(uri).extension().and_then(OsStr::to_str) { Some(ext) => match ext { "csv" | "json" | "ndjson" | "jsonl" | "parquet" | "arrow" | "arrows" - | "xlsx" => ext, + | "xls" | "xlsx" | "xlsb" | "ods" => ext, "sqlite" | "sqlite3" | "db" => "sqlite", _ => { return Err(Error::Extension { @@ -628,7 +639,10 @@ pub async fn load( TableLoadOption::google_spreadsheet(_) => { Arc::new(google_spreadsheets::to_mem_table(t).await?) } - TableLoadOption::xlsx { .. } => Arc::new(xlsx::to_mem_table(t).await?), + TableLoadOption::xlsx { .. } + | TableLoadOption::xls { .. } + | TableLoadOption::xlsb { .. } + | TableLoadOption::ods { .. } => Arc::new(excel::to_mem_table(t).await?), TableLoadOption::delta { .. } => delta::to_datafusion_table(t, dfctx).await?, TableLoadOption::arrow { .. } => { Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?) @@ -652,6 +666,7 @@ pub async fn load( "json" => Arc::new(json::to_mem_table(t, dfctx).await?), "ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t, dfctx).await?), "parquet" => parquet::to_datafusion_table(t, dfctx).await?, + "xls" | "xlsx" | "xlsb" | "ods" => Arc::new(excel::to_mem_table(t).await?), "arrow" => Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?), "arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?), "mysql" => Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?), diff --git a/columnq/src/table/xlsx.rs b/columnq/src/table/xlsx.rs deleted file mode 100644 index 3efc330..0000000 --- a/columnq/src/table/xlsx.rs +++ /dev/null @@ -1,272 +0,0 @@ -use crate::table::{self, TableSource}; -use calamine::{open_workbook, Range, Reader, Xlsx}; -use datafusion::arrow::array::{ArrayRef, BooleanArray, PrimitiveArray, StringArray}; -use datafusion::arrow::datatypes::{DataType, Field, Float64Type, Int64Type, Schema}; -use datafusion::arrow::record_batch::RecordBatch; -use snafu::prelude::*; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use std::vec; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Failed to load XLSX: {msg}"))] - Load { msg: String }, - #[snafu(display("Failed to create record batch: {source}"))] - CreateRecordBatch { - source: datafusion::arrow::error::ArrowError, - }, - #[snafu(display("Failed to open workbook: {source}"))] - OpenWorkbook { source: calamine::XlsxError }, -} - -fn infer_value_type(v: &calamine::DataType) -> Result { - match v { - calamine::DataType::Int(_) if v.get_int().is_some() => Ok(DataType::Int64), - calamine::DataType::Float(_) if v.get_float().is_some() => Ok(DataType::Float64), - calamine::DataType::Bool(_) if v.get_bool().is_some() => Ok(DataType::Boolean), - calamine::DataType::String(_) if v.get_string().is_some() => Ok(DataType::Utf8), - calamine::DataType::Error(e) => Err(Error::Load { msg: e.to_string() }), - // TODO(upstream): support `Date64` - calamine::DataType::DateTime(_) => Err(Error::Load { - msg: "Unsupported data type: DateTime".to_owned(), - }), - calamine::DataType::Empty => Ok(DataType::Null), - _ => Err(Error::Load { - msg: "Failed to parse the cell value".to_owned(), - }), - } -} - -fn infer_schema(r: &Range) -> Result { - let mut col_types: HashMap<&str, HashSet> = HashMap::new(); - let mut rows = r.rows(); - let col_names: Result, _> = rows - .next() - .unwrap() - .iter() - .enumerate() - .map(|(i, c)| { - c.get_string().ok_or_else(|| Error::Load { - msg: format!("The {i}th column name is empty"), - }) - }) - .collect(); - - let col_names = match col_names { - Ok(values) => values, - Err(e) => return Err(e), - }; - - for row in rows { - for (i, col_val) in row.iter().enumerate() { - let col_name = col_names.get(i).unwrap(); - let col_type = infer_value_type(col_val).unwrap(); - let entry = col_types.entry(col_name).or_default(); - entry.insert(col_type); - } - } - - let fields: Vec = col_names - .iter() - .map(|col_name| { - let set = col_types.entry(col_name).or_insert_with(|| { - let mut set = HashSet::new(); - set.insert(DataType::Utf8); - set - }); - - let mut dt_iter = set.iter().cloned(); - let dt = dt_iter.next().unwrap_or(DataType::Utf8); - Field::new(col_name.replace(' ', "_"), dt, true) - }) - .collect(); - Ok(Schema::new(fields)) -} - -fn xlsx_sheet_value_to_record_batch(r: Range) -> Result { - let schema = infer_schema(&r)?; - let arrays = schema - .fields() - .iter() - .enumerate() - .map(|(i, field)| { - let rows = r.rows().skip(1); - match field.data_type() { - DataType::Boolean => Arc::new( - rows.map(|r| r.get(i).map(|v| v.get_bool().unwrap())) - .collect::(), - ) as ArrayRef, - DataType::Int64 => Arc::new( - rows.map(|r| r.get(i).map(|v| v.get_int().unwrap())) - .collect::>(), - ) as ArrayRef, - DataType::Float64 => Arc::new( - rows.map(|r| r.get(i).map(|v| v.get_float().unwrap())) - .collect::>(), - ) as ArrayRef, - _ => Arc::new( - rows.map(|r| r.get(i).map(|v| v.get_string().unwrap_or("null"))) - .collect::(), - ) as ArrayRef, - } - }) - .collect::>(); - - RecordBatch::try_new(Arc::new(schema), arrays).context(CreateRecordBatchSnafu) -} - -pub async fn to_mem_table( - t: &TableSource, -) -> Result { - let opt = t - .option - .as_ref() - .ok_or(table::Error::MissingOption {})? - .as_xlsx()?; - let uri = t.get_uri_str(); - let mut workbook: Xlsx<_> = open_workbook(uri) - .context(OpenWorkbookSnafu) - .context(table::LoadXlsxSnafu)?; - match &opt.sheet_name { - Some(sheet) => { - if let Some(Ok(r)) = workbook.worksheet_range(sheet) { - let batch = xlsx_sheet_value_to_record_batch(r).context(table::LoadXlsxSnafu)?; - let schema_ref = batch.schema(); - let partitions = vec![vec![batch]]; - Ok( - datafusion::datasource::MemTable::try_new(schema_ref, partitions) - .context(table::CreateMemTableSnafu)?, - ) - } else { - Err(Error::Load { - msg: "Failed to open .xlsx file.".to_owned(), - }) - .context(table::LoadXlsxSnafu) - } - } - None => Err(Error::Load { - msg: "`sheet_name` is not specified".to_owned(), - }) - .context(table::LoadXlsxSnafu), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::array::{BooleanArray, Float64Array, Int64Array, StringArray}; - use crate::table::TableIoSource; - use crate::test_util::*; - use datafusion::datasource::TableProvider; - use datafusion::prelude::SessionContext; - - use calamine::{Cell, DataType as XlsxDataType, Range}; - - fn property_sheet() -> Range { - let cells: Vec> = vec![ - Cell::new((0, 0), XlsxDataType::String("float_column".to_string())), - Cell::new((1, 0), XlsxDataType::Float(1.333)), - Cell::new((2, 0), XlsxDataType::Float(3.333)), - Cell::new((0, 1), XlsxDataType::String("integer_column".to_string())), - Cell::new((1, 1), XlsxDataType::Int(1)), - Cell::new((2, 1), XlsxDataType::Int(3)), - Cell::new((0, 2), XlsxDataType::String("boolean_column".to_string())), - Cell::new((1, 2), XlsxDataType::Bool(true)), - Cell::new((2, 2), XlsxDataType::Bool(false)), - Cell::new((0, 3), XlsxDataType::String("string_column".to_string())), - Cell::new((1, 3), XlsxDataType::String("foo".to_string())), - Cell::new((2, 3), XlsxDataType::String("bar".to_string())), - ]; - calamine::Range::::from_sparse(cells) - } - - #[tokio::test] - async fn load_xlsx_with_toml_config() { - let mut table_source: TableSource = toml::from_str( - r#" -name = "test" -uri = "test_data/uk_cities_with_headers.xlsx" -[option] -format = "xlsx" -sheet_name = "uk_cities_with_headers" -"#, - ) - .unwrap(); - // patch uri path with the correct test data path - table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx")); - - let t = to_mem_table(&table_source).await.unwrap(); - let ctx = SessionContext::new(); - let stats = t - .scan(&ctx.state(), None, &[], None) - .await - .unwrap() - .statistics(); - assert_eq!(stats.num_rows, Some(37)); - } - - #[tokio::test] - async fn load_xlsx_with_yaml_config() { - let mut table_source: TableSource = serde_yaml::from_str( - r#" -name: "test" -uri: "test_data/uk_cities_with_headers.xlsx" -option: - format: "xlsx" - sheet_name: "uk_cities_with_headers" -"#, - ) - .unwrap(); - // patch uri path with the correct test data path - table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx")); - - let t = to_mem_table(&table_source).await.unwrap(); - let ctx = SessionContext::new(); - let stats = t - .scan(&ctx.state(), None, &[], None) - .await - .unwrap() - .statistics(); - assert_eq!(stats.num_rows, Some(37)); - } - - #[test] - fn schema_interface() { - let sheet = property_sheet(); - let schema = infer_schema(&sheet).unwrap(); - assert_eq!( - schema, - Schema::new(vec![ - Field::new("float_column", DataType::Float64, true), - Field::new("integer_column", DataType::Int64, true), - Field::new("boolean_column", DataType::Boolean, true), - Field::new("string_column", DataType::Utf8, true), - ]) - ); - } - - #[test] - fn xlsx_value_to_record_batch() { - let sheet = property_sheet(); - let rb = xlsx_sheet_value_to_record_batch(sheet).unwrap(); - - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column(0).as_ref(), - Arc::new(Float64Array::from(vec![1.333, 3.333])).as_ref(), - ); - assert_eq!( - rb.column(1).as_ref(), - Arc::new(Int64Array::from(vec![1, 3])).as_ref(), - ); - assert_eq!( - rb.column(2).as_ref(), - Arc::new(BooleanArray::from(vec![true, false])).as_ref(), - ); - assert_eq!( - rb.column(3).as_ref(), - Arc::new(StringArray::from(vec!["foo", "bar"])).as_ref(), - ); - } -} diff --git a/test_data/excel_range.ods b/test_data/excel_range.ods new file mode 100644 index 0000000..7f596e2 Binary files /dev/null and b/test_data/excel_range.ods differ