Update xlsx table (#316)

This PR is a fix for issue https://github.com/roapi/roapi/issues/259

List of updates/fixes:
* module xlsx renamed to excel.
* Allow reading not only xlsx format but also xls, ods, xlsb
* Allow Excel DateTime format and transform it to arrow
Timestamp(Seconds, None)
* Allow using NULLs in any data types and use null value instead of
string "null"
* Fix issue with incorrect data type inference when multiple data types
are detected.
* Add possibility to specify data schema in config.
* Add new options: -
rows_range_start
 - rows_range_end
 - columns_range_start
 - columns_range_end
 - schema_inference_lines
* Make sheet_name optional and if it is not specified than use first
sheet by default

* Bump calamine crate to version 0.23.1 and add feature "dates"
(supporting for DateTime column format)

Documentation updates: https://github.com/roapi/docs/pull/20
This commit is contained in:
Maksym Dovhal 2024-02-01 06:16:34 +02:00 committed by GitHub
parent 9377b7ccad
commit 81a25205b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 816 additions and 300 deletions

28
Cargo.lock generated
View File

@ -851,15 +851,17 @@ dependencies = [
[[package]]
name = "calamine"
version = "0.19.1"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6381d1037ee9b8a6c8eb97936add0331a1aabd148d5b6f35f1cda6e5dec44f40"
checksum = "47a4d6ea525ea187df1e3a1c4b23469b1cbe60c5bafc1c0ef14b2b8738a8303d"
dependencies = [
"byteorder",
"chrono",
"codepage",
"encoding_rs",
"log",
"quick-xml 0.25.0",
"once_cell",
"quick-xml 0.31.0",
"serde",
"zip",
]
@ -3496,16 +3498,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "quick-xml"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58e21a144a0ffb5fad7b464babcdab934a325ad69b7c0373bcfef5cbd9799ca9"
dependencies = [
"encoding_rs",
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.28.2"
@ -3516,6 +3508,16 @@ dependencies = [
"serde",
]
[[package]]
name = "quick-xml"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
dependencies = [
"encoding_rs",
"memchr",
]
[[package]]
name = "quote"
version = "1.0.33"

View File

@ -318,7 +318,7 @@ Data layer:
- [x] JSON
- [x] NDJSON
- [x] parquet
- [x] xls, xlsx, xlsm, ods: https://github.com/tafia/calamine
- [x] xls, xlsx, xlsb, ods: https://github.com/tafia/calamine
- [x] [DeltaLake](https://delta.io/)
Misc:

View File

@ -39,7 +39,7 @@ reqwest = { version = "0.11", default-features = false, features = [
"blocking",
"json",
] }
calamine = "0.19.1"
calamine = {version = "0.23.1", features = ["dates"]}
tokio = { version = "1", features = ["rt-multi-thread"] }
futures = "0.3"

View File

@ -39,8 +39,8 @@ pub enum ColumnQError {
#[error("Error loading Delta table: {0}")]
LoadDelta(String),
#[error("Error loading Xlsx table: {0}")]
LoadXlsx(String),
#[error("Error loading Excel table: {0}")]
LoadExcel(String),
#[error("Error loading data from HTTP store: {0}")]
HttpStore(String),

771
columnq/src/table/excel.rs Normal file
View File

@ -0,0 +1,771 @@
use crate::table::{self, TableOptionExcel, TableSchema, TableSource};
use arrow_schema::TimeUnit;
use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets};
use datafusion::arrow::array::{
ArrayRef, BooleanArray, DurationSecondArray, NullArray, PrimitiveArray, StringArray,
TimestampSecondArray,
};
use datafusion::arrow::datatypes::{
DataType, Date32Type, Date64Type, Field, Float64Type, Int64Type, Schema,
};
use datafusion::arrow::record_batch::RecordBatch;
use snafu::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::vec;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to load Excel: {msg}"))]
Load { msg: String },
#[snafu(display("Incorrect schema: {msg}"))]
IncorrectSchema { msg: String },
#[snafu(display("Excel schema inference error"))]
SchemaInference,
#[snafu(display("Failed to create record batch: {source}"))]
CreateRecordBatch {
source: datafusion::arrow::error::ArrowError,
},
#[snafu(display("Failed to open workbook: {source}"))]
OpenWorkbook { source: calamine::Error },
}
struct ExcelSubrange<'a> {
rows: calamine::Rows<'a, ExcelDataType>,
columns_range_start: usize,
columns_range_end: usize,
total_rows: usize,
current_row_id: usize,
}
impl<'a> ExcelSubrange<'a> {
fn new(
range: &'a Range<ExcelDataType>,
rows_range_start: Option<usize>,
rows_range_end: Option<usize>,
columns_range_start: Option<usize>,
columns_range_end: Option<usize>,
) -> ExcelSubrange {
let rows_range_start = rows_range_start.unwrap_or(usize::MIN);
let rows_range_end = rows_range_end
.or(range.end().map(|v| v.0 as usize))
.unwrap();
let mut rows = range.rows();
if rows_range_start > 0 {
// rows skipping
rows.nth(rows_range_start - 1);
}
ExcelSubrange {
rows,
columns_range_start: columns_range_start.unwrap_or(usize::MIN),
columns_range_end: columns_range_end.unwrap_or(usize::MAX),
total_rows: rows_range_end - rows_range_start + 1,
current_row_id: 0,
}
}
fn size(&self) -> usize {
self.total_rows
}
}
impl<'a> Iterator for ExcelSubrange<'a> {
type Item = &'a [ExcelDataType];
fn next(&mut self) -> Option<Self::Item> {
if self.current_row_id < self.total_rows {
self.current_row_id += 1;
self.rows
.next()
.map(|x| &x[self.columns_range_start..=self.columns_range_end.min(x.len() - 1)])
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.total_rows))
}
}
fn infer_value_type(v: &ExcelDataType) -> Result<DataType, Error> {
match v {
ExcelDataType::Int(_) => Ok(DataType::Int64),
ExcelDataType::Float(_) => Ok(DataType::Float64),
ExcelDataType::String(_) => Ok(DataType::Utf8),
ExcelDataType::Bool(_) => Ok(DataType::Boolean),
ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => {
Ok(DataType::Duration(TimeUnit::Second))
}
ExcelDataType::Error(e) => Err(Error::Load { msg: e.to_string() }),
ExcelDataType::Empty => Ok(DataType::Null),
}
}
fn infer_schema_from_data(mut range: ExcelSubrange) -> Result<Schema, Error> {
let mut col_types: HashMap<&str, DataType> = HashMap::new();
let col_names: Vec<&str> = range
.next()
.ok_or(Error::Load {
msg: String::from("Failed to infer schema for empty excel table"),
})?
.iter()
.enumerate()
.map(|(i, c)| {
c.get_string().ok_or_else(|| Error::Load {
msg: format!("The {i}th column name is empty"),
})
})
.collect::<Result<Vec<&str>, _>>()?;
for row in range {
for (i, col_val) in row.iter().enumerate() {
let col_name = col_names.get(i).ok_or(Error::Load {
msg: String::from(
"Failed to infer schema. Number of values in row is more then column names.",
),
})?;
let col_type = infer_value_type(col_val)?;
col_types
.entry(col_name)
.and_modify(|ct| {
if !ct.equals_datatype(&col_type) && ct.equals_datatype(&DataType::Null) {
*ct = col_type.clone();
}
// if column values has more than one not null type then we upcast column type to the most general datatype Utf8.
else if !ct.equals_datatype(&col_type)
&& !&col_type.equals_datatype(&DataType::Null)
{
*ct = DataType::Utf8;
}
})
.or_insert(col_type);
}
}
let fields: Vec<Field> = col_names
.iter()
.map(|col_name| {
let dt = col_types.get(col_name).unwrap_or(&DataType::Utf8).clone();
Field::new(col_name.replace(' ', "_"), dt, true)
})
.collect();
Ok(Schema::new(fields))
}
fn infer_schema_from_config(table_schema: &TableSchema) -> Result<Schema, Error> {
let unsupported_data_types = table_schema
.columns
.iter()
.filter(|c| {
!matches!(
c.data_type,
DataType::Boolean
| DataType::Int64
| DataType::Float64
| DataType::Duration(TimeUnit::Second)
| DataType::Date32
| DataType::Date64
| DataType::Null
| DataType::Utf8
| DataType::Timestamp(TimeUnit::Second, None)
)
})
.map(|c| c.name.clone())
.collect::<Vec<_>>()
.join(", ");
if unsupported_data_types.is_empty() {
Ok(table_schema.into())
} else {
Err(Error::IncorrectSchema{msg: format!("Configured schema for excel file contains unsupported data types in columns {}. Supported datatype: \
Boolean, Int64, Float64, Date32, Date64, !Timestamp [Second, null], !Duration [Second], Null, Utf8", unsupported_data_types)})
}
}
fn empty_or_panic<T>(v: &ExcelDataType, field_name: &String) -> Option<T> {
if v.is_empty() {
None
} else {
panic!("Incorrect value {:?} in column {}", v, field_name)
}
}
fn infer_schema(
r: &Range<ExcelDataType>,
option: &TableOptionExcel,
schema: &Option<TableSchema>,
) -> Result<Schema, Error> {
let TableOptionExcel {
rows_range_start,
rows_range_end,
columns_range_start,
columns_range_end,
schema_inference_lines,
..
} = *option;
if let Some(schema) = schema {
infer_schema_from_config(schema)
} else {
let last_row_for_schema_inference = schema_inference_lines
.map(|r| r + rows_range_start.unwrap_or(0))
.or(rows_range_end);
let range = ExcelSubrange::new(
r,
rows_range_start,
last_row_for_schema_inference,
columns_range_start,
columns_range_end,
);
infer_schema_from_data(range)
}
}
fn excel_range_to_record_batch(
r: Range<ExcelDataType>,
option: &TableOptionExcel,
schema: Schema,
) -> Result<RecordBatch, Error> {
let TableOptionExcel {
rows_range_start,
rows_range_end,
columns_range_start,
columns_range_end,
..
} = *option;
let arrays = schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let rows = ExcelSubrange::new(
&r,
rows_range_start.map(|x| x + 1).or(Some(1)), // skip first row because it is header
rows_range_end,
columns_range_start,
columns_range_end,
);
let field_name = field.name();
match field.data_type() {
DataType::Boolean => Arc::new(
rows.map(|r| {
r.get(i)
.and_then(|v| v.get_bool().or_else(|| empty_or_panic(v, field_name)))
})
.collect::<BooleanArray>(),
) as ArrayRef,
DataType::Int64 => Arc::new(
rows.map(|r| {
r.get(i)
.and_then(|v| v.get_int().or_else(|| empty_or_panic(v, field_name)))
})
.collect::<PrimitiveArray<Int64Type>>(),
) as ArrayRef,
DataType::Float64 => Arc::new(
rows.map(|r| {
r.get(i)
.and_then(|v| v.get_float().or_else(|| empty_or_panic(v, field_name)))
})
.collect::<PrimitiveArray<Float64Type>>(),
) as ArrayRef,
DataType::Duration(TimeUnit::Second) => Arc::new(
rows.map(|r| {
r.get(i).and_then(|v| {
v.as_duration()
.map(|v| v.num_seconds())
.or_else(|| empty_or_panic(v, field_name))
})
})
.collect::<DurationSecondArray>(),
) as ArrayRef,
DataType::Null => Arc::new(NullArray::new(rows.size())) as ArrayRef,
DataType::Utf8 => Arc::new(
rows.map(|r| {
r.get(i).and_then(|v| match v {
ExcelDataType::Bool(x) => Some(x.to_string()),
ExcelDataType::Float(_)
| ExcelDataType::Int(_)
| ExcelDataType::String(_) => v.as_string(),
ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => {
v.as_datetime().map(|x| x.to_string())
}
ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => {
v.as_duration().map(|x| x.to_string())
}
ExcelDataType::Empty => None,
ExcelDataType::Error(e) => Some(e.to_string()),
})
})
.collect::<StringArray>(),
) as ArrayRef,
DataType::Timestamp(TimeUnit::Second, None) => Arc::new(
rows.map(|r| {
r.get(i).and_then(|v| {
v.as_datetime()
.map(|v| v.and_utc().timestamp())
.or_else(|| empty_or_panic(v, field_name))
})
})
.collect::<TimestampSecondArray>(),
) as ArrayRef,
DataType::Date64 => Arc::new(
rows.map(|r| {
r.get(i).and_then(|v| {
v.as_datetime()
.map(|v| v.timestamp_millis())
.or_else(|| empty_or_panic(v, field_name))
})
})
.collect::<PrimitiveArray<Date64Type>>(),
) as ArrayRef,
DataType::Date32 => Arc::new(
rows.map(|r| {
r.get(i).and_then(|v| {
v.as_datetime()
.map(|v| (v.timestamp() / 86400) as i32)
.or_else(|| empty_or_panic(v, field_name))
})
})
.collect::<PrimitiveArray<Date32Type>>(),
) as ArrayRef,
unsupported => panic!("Unsupported data type for excel table {:?}", unsupported),
}
})
.collect::<Vec<ArrayRef>>();
RecordBatch::try_new(Arc::new(schema), arrays).context(CreateRecordBatchSnafu)
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, table::Error> {
let opt = t
.option
.as_ref()
.ok_or(table::Error::MissingOption {})?
.as_excel()?;
let uri = t.get_uri_str();
let mut workbook: Sheets<_> = open_workbook_auto(uri)
.context(OpenWorkbookSnafu)
.context(table::LoadExcelSnafu)?;
let worksheet_range = match &opt.sheet_name {
Some(sheet) => Some(workbook.worksheet_range(sheet)),
None => workbook.worksheet_range_at(0),
};
if let Some(Ok(range)) = worksheet_range {
let shema = infer_schema(&range, opt, &t.schema).context(table::LoadExcelSnafu)?;
let batch =
excel_range_to_record_batch(range, opt, shema).context(table::LoadExcelSnafu)?;
let schema_ref = batch.schema();
let partitions = vec![vec![batch]];
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.context(table::CreateMemTableSnafu)
} else {
Err(Error::Load {
msg: "Failed to open excel file.".to_owned(),
})
.context(table::LoadExcelSnafu)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{BooleanArray, Float64Array, Int64Array, StringArray};
use crate::table::{TableColumn, TableIoSource};
use crate::test_util::*;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use calamine::{Cell, DataType as ExcelDataType};
#[test]
fn excel_subrange_iteration() {
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::Int(0)),
Cell::new((0, 1), ExcelDataType::Bool(true)),
Cell::new((0, 2), ExcelDataType::Float(0.333)),
Cell::new((1, 0), ExcelDataType::Int(1)),
Cell::new((1, 1), ExcelDataType::Bool(false)),
Cell::new((1, 2), ExcelDataType::Float(1.333)),
Cell::new((2, 0), ExcelDataType::Int(2)),
Cell::new((2, 1), ExcelDataType::Empty),
Cell::new((2, 2), ExcelDataType::Float(2.333)),
Cell::new((3, 0), ExcelDataType::Int(3)),
Cell::new((3, 1), ExcelDataType::Bool(true)),
Cell::new((3, 2), ExcelDataType::Float(3.333)),
]);
let mut subrange = ExcelSubrange::new(&range, None, None, None, None);
assert_eq!(subrange.size(), 4);
assert_eq!(
subrange.next(),
Some(
&vec![
ExcelDataType::Int(0),
ExcelDataType::Bool(true),
ExcelDataType::Float(0.333)
][..]
)
);
assert_eq!(
subrange.next(),
Some(
&vec![
ExcelDataType::Int(1),
ExcelDataType::Bool(false),
ExcelDataType::Float(1.333)
][..]
)
);
assert_eq!(
subrange.next(),
Some(
&vec![
ExcelDataType::Int(2),
ExcelDataType::Empty,
ExcelDataType::Float(2.333)
][..]
)
);
assert_eq!(
subrange.next(),
Some(
&vec![
ExcelDataType::Int(3),
ExcelDataType::Bool(true),
ExcelDataType::Float(3.333)
][..]
)
);
assert_eq!(subrange.next(), None);
let mut subrange = ExcelSubrange::new(&range, Some(1), Some(2), Some(1), Some(1));
assert_eq!(subrange.size(), 2);
assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Bool(false)][..]));
assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Empty][..]));
assert_eq!(subrange.next(), None);
}
#[test]
fn inferes_schema_from_data() {
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))),
Cell::new((0, 1), ExcelDataType::String(String::from("bool_column"))),
Cell::new((0, 2), ExcelDataType::String(String::from("float column"))),
Cell::new((0, 3), ExcelDataType::String(String::from("string_column"))),
Cell::new(
(0, 4),
ExcelDataType::String(String::from("datetime_column")),
),
Cell::new(
(0, 5),
ExcelDataType::String(String::from("datetime iso column")),
),
Cell::new(
(0, 6),
ExcelDataType::String(String::from("duration column")),
),
Cell::new(
(0, 7),
ExcelDataType::String(String::from("duration iso column")),
),
Cell::new((1, 0), ExcelDataType::Int(0)),
Cell::new((1, 1), ExcelDataType::Bool(true)),
Cell::new((1, 2), ExcelDataType::Float(0.333)),
Cell::new((1, 3), ExcelDataType::String(String::from("test"))),
Cell::new((1, 4), ExcelDataType::DateTime(44986.12)),
Cell::new((1, 5), ExcelDataType::DateTimeIso(String::from("test"))),
Cell::new((1, 6), ExcelDataType::Duration(44986.12)),
Cell::new((1, 7), ExcelDataType::DurationIso(String::from("test"))),
Cell::new((2, 0), ExcelDataType::Empty),
Cell::new((2, 0), ExcelDataType::Empty),
Cell::new((2, 1), ExcelDataType::Empty),
Cell::new((2, 2), ExcelDataType::Empty),
Cell::new((2, 3), ExcelDataType::Empty),
Cell::new((2, 4), ExcelDataType::Empty),
Cell::new((2, 5), ExcelDataType::Empty),
Cell::new((2, 6), ExcelDataType::Empty),
Cell::new((2, 7), ExcelDataType::Empty),
]);
let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
assert_eq!(
schema,
Schema::new(vec![
Field::new("int_column", DataType::Int64, true),
Field::new("bool_column", DataType::Boolean, true),
Field::new("float_column", DataType::Float64, true),
Field::new("string_column", DataType::Utf8, true),
Field::new(
"datetime_column",
DataType::Timestamp(TimeUnit::Second, None),
true
),
Field::new(
"datetime_iso_column",
DataType::Timestamp(TimeUnit::Second, None),
true
),
Field::new(
"duration_column",
DataType::Duration(TimeUnit::Second),
true
),
Field::new(
"duration_iso_column",
DataType::Duration(TimeUnit::Second),
true
),
])
);
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::String(String::from("test_column"))),
Cell::new((1, 0), ExcelDataType::Int(0)),
Cell::new((2, 0), ExcelDataType::Empty),
Cell::new((2, 0), ExcelDataType::Float(0.5)),
]);
let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
assert_eq!(
schema,
Schema::new(vec![Field::new("test_column", DataType::Utf8, true)])
);
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))),
Cell::new((0, 1), ExcelDataType::Empty),
Cell::new((0, 2), ExcelDataType::String(String::from("float column"))),
]);
assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err());
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::String(String::from("column1"))),
Cell::new((0, 1), ExcelDataType::String(String::from("column2"))),
Cell::new((1, 0), ExcelDataType::Int(1)),
Cell::new((1, 1), ExcelDataType::Int(1)),
Cell::new((1, 3), ExcelDataType::Int(1)),
]);
assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err());
}
#[test]
fn inferes_schema_from_config() {
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![]);
let table_schema = TableSchema {
columns: vec![
TableColumn {
name: String::from("float_column"),
data_type: DataType::Float64,
nullable: true,
},
TableColumn {
name: String::from("integer_column"),
data_type: DataType::Int64,
nullable: true,
},
],
};
let schema =
infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).unwrap();
assert_eq!(
schema.all_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
]
);
let table_schema = TableSchema {
columns: vec![
TableColumn {
name: String::from("float_column"),
data_type: DataType::Float16,
nullable: true,
},
TableColumn {
name: String::from("integer_column"),
data_type: DataType::Int16,
nullable: true,
},
],
};
assert!(infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).is_err());
}
#[tokio::test]
async fn load_xlsx_with_toml_config() {
let mut table_source: TableSource = toml::from_str(
r#"
name = "test"
uri = "test_data/uk_cities_with_headers.xlsx"
[option]
format = "xlsx"
sheet_name = "uk_cities_with_headers"
"#,
)
.unwrap();
// patch uri path with the correct test data path
table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx"));
let t = to_mem_table(&table_source).await.unwrap();
let ctx = SessionContext::new();
let stats = t
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
.statistics();
assert_eq!(stats.num_rows, Some(37));
}
#[tokio::test]
async fn load_xlsx_with_yaml_config() {
let mut table_source: TableSource = serde_yaml::from_str(
r#"
name: "test"
uri: "test_data/uk_cities_with_headers.xlsx"
option:
format: "xlsx"
sheet_name: "uk_cities_with_headers"
"#,
)
.unwrap();
// patch uri path with the correct test data path
table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx"));
let t = to_mem_table(&table_source).await.unwrap();
let ctx = SessionContext::new();
let stats = t
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
.statistics();
assert_eq!(stats.num_rows, Some(37));
}
#[tokio::test]
async fn load_ods_with_custom_range_and_without_sheet_name() {
let mut table_source: TableSource = serde_yaml::from_str(
r#"
name: "test"
uri: "test_data/excel_range.ods"
option:
format: "ods"
rows_range_start: 2
rows_range_end: 5
columns_range_start: 1
columns_range_end: 6
schema_inference_lines: 3
"#,
)
.unwrap();
// patch uri path with the correct test data path
table_source.io_source = TableIoSource::Uri(test_data_path("excel_range.ods"));
let t = to_mem_table(&table_source).await.unwrap();
let ctx = SessionContext::new();
let stats = t
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
.statistics();
assert_eq!(stats.column_statistics.unwrap().len(), 6);
assert_eq!(stats.num_rows, Some(3));
}
#[test]
fn transforms_excel_range_to_record_batch() {
let range: calamine::Range<ExcelDataType> =
calamine::Range::<ExcelDataType>::from_sparse(vec![
Cell::new((0, 0), ExcelDataType::String("float_column".to_string())),
Cell::new((1, 0), ExcelDataType::Float(1.333)),
Cell::new((2, 0), ExcelDataType::Empty),
Cell::new((3, 0), ExcelDataType::Float(3.333)),
Cell::new((0, 1), ExcelDataType::String("integer_column".to_string())),
Cell::new((1, 1), ExcelDataType::Int(1)),
Cell::new((2, 1), ExcelDataType::Int(3)),
Cell::new((3, 1), ExcelDataType::Empty),
Cell::new((0, 2), ExcelDataType::String("boolean_column".to_string())),
Cell::new((1, 2), ExcelDataType::Empty),
Cell::new((2, 2), ExcelDataType::Bool(true)),
Cell::new((3, 2), ExcelDataType::Bool(false)),
Cell::new((0, 3), ExcelDataType::String("string_column".to_string())),
Cell::new((1, 3), ExcelDataType::String("foo".to_string())),
Cell::new((2, 3), ExcelDataType::String("bar".to_string())),
Cell::new((3, 3), ExcelDataType::String("baz".to_string())),
Cell::new((0, 4), ExcelDataType::String("mixed_column".to_string())),
Cell::new((1, 4), ExcelDataType::Float(1.1)),
Cell::new((2, 4), ExcelDataType::Int(1)),
Cell::new((3, 4), ExcelDataType::Empty),
Cell::new((0, 5), ExcelDataType::String("datetime_column".to_string())),
Cell::new((1, 5), ExcelDataType::DateTime(44986.12)), // 2023-03-01T02:52:48
Cell::new((2, 5), ExcelDataType::Empty),
Cell::new((3, 5), ExcelDataType::DateTime(44900.12)), // 2022-12-05T02:52:48
]);
let shema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
let rb = excel_range_to_record_batch(range, &TableOptionExcel::default(), shema).unwrap();
assert_eq!(
rb.schema().all_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
&Field::new("boolean_column", DataType::Boolean, true),
&Field::new("string_column", DataType::Utf8, true),
&Field::new("mixed_column", DataType::Utf8, true),
&Field::new(
"datetime_column",
DataType::Timestamp(TimeUnit::Second, None),
true
),
]
);
assert_eq!(
rb.column(0).as_ref(),
Arc::new(Float64Array::from(vec![Some(1.333), None, Some(3.333)])).as_ref(),
);
assert_eq!(
rb.column(1).as_ref(),
Arc::new(Int64Array::from(vec![Some(1), Some(3), None])).as_ref(),
);
assert_eq!(
rb.column(2).as_ref(),
Arc::new(BooleanArray::from(vec![None, Some(true), Some(false)])).as_ref(),
);
assert_eq!(
rb.column(3).as_ref(),
Arc::new(StringArray::from(vec!["foo", "bar", "baz"])).as_ref(),
);
assert_eq!(
rb.column(4).as_ref(),
Arc::new(StringArray::from(vec![Some("1.1"), Some("1"), None])).as_ref(),
);
assert_eq!(
rb.column(5).as_ref(),
Arc::new(TimestampSecondArray::from(vec![
Some(1677639168), // Unix timestamp for 2023-03-01T02:52:48 UTC
None,
Some(1670208768) // Unix timestamp for 2022-12-05T02:52:48 UTC
]))
.as_ref(),
);
}
}

View File

@ -21,11 +21,11 @@ pub mod arrow_ipc_stream;
pub mod csv;
pub mod database;
pub mod delta;
pub mod excel;
pub mod google_spreadsheets;
pub mod json;
pub mod ndjson;
pub mod parquet;
pub mod xlsx;
#[derive(Debug, Snafu)]
pub enum Error {
@ -45,8 +45,8 @@ pub enum Error {
LoadArrowIpcFile { source: arrow_ipc_file::Error },
#[snafu(display("Failed to load Google Sheet data: {source}"))]
LoadGoogleSheet { source: google_spreadsheets::Error },
#[snafu(display("Failed to load XLSX data: {source}"))]
LoadXlsx { source: xlsx::Error },
#[snafu(display("Failed to load Excel data: {source}"))]
LoadExcel { source: excel::Error },
#[snafu(display("Failed to load database data: {source}"))]
LoadDatabase { source: database::Error },
#[snafu(display("Failed to cast IO source to memory bytes for source: {table_source}"))]
@ -251,9 +251,14 @@ impl Default for TableOptionParquet {
}
}
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionXlsx {
#[derive(Deserialize, Default, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionExcel {
pub sheet_name: Option<String>,
pub rows_range_start: Option<usize>,
pub rows_range_end: Option<usize>,
pub columns_range_start: Option<usize>,
pub columns_range_end: Option<usize>,
pub schema_inference_lines: Option<usize>,
}
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
@ -298,7 +303,10 @@ pub enum TableLoadOption {
jsonl {},
parquet(TableOptionParquet),
google_spreadsheet(TableOptionGoogleSpreadsheet),
xlsx(TableOptionXlsx),
xls(TableOptionExcel),
xlsx(TableOptionExcel),
xlsb(TableOptionExcel),
ods(TableOptionExcel),
delta(TableOptionDelta),
arrow {},
arrows {},
@ -317,10 +325,10 @@ impl TableLoadOption {
}
}
pub fn as_xlsx(&self) -> Result<&TableOptionXlsx, Error> {
pub fn as_excel(&self) -> Result<&TableOptionExcel, Error> {
match self {
Self::xlsx(opt) => Ok(opt),
_ => Err(Error::ExpectFormatOption { fmt: "xlsx" }),
Self::xls(opt) | Self::xlsx(opt) | Self::xlsb(opt) | Self::ods(opt) => Ok(opt),
_ => Err(Error::ExpectFormatOption { fmt: "excel" }),
}
}
@ -353,7 +361,10 @@ impl TableLoadOption {
Self::csv { .. } => "csv",
Self::parquet { .. } => "parquet",
Self::google_spreadsheet(_) | Self::delta { .. } => "",
Self::xls { .. } => "xls",
Self::xlsx { .. } => "xlsx",
Self::ods { .. } => "ods",
Self::xlsb { .. } => "xlsb",
Self::arrow { .. } => "arrow",
Self::arrows { .. } => "arrows",
Self::mysql { .. } => "mysql",
@ -537,7 +548,7 @@ impl TableSource {
match Path::new(uri).extension().and_then(OsStr::to_str) {
Some(ext) => match ext {
"csv" | "json" | "ndjson" | "jsonl" | "parquet" | "arrow" | "arrows"
| "xlsx" => ext,
| "xls" | "xlsx" | "xlsb" | "ods" => ext,
"sqlite" | "sqlite3" | "db" => "sqlite",
_ => {
return Err(Error::Extension {
@ -628,7 +639,10 @@ pub async fn load(
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
}
TableLoadOption::xlsx { .. } => Arc::new(xlsx::to_mem_table(t).await?),
TableLoadOption::xlsx { .. }
| TableLoadOption::xls { .. }
| TableLoadOption::xlsb { .. }
| TableLoadOption::ods { .. } => Arc::new(excel::to_mem_table(t).await?),
TableLoadOption::delta { .. } => delta::to_datafusion_table(t, dfctx).await?,
TableLoadOption::arrow { .. } => {
Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?)
@ -652,6 +666,7 @@ pub async fn load(
"json" => Arc::new(json::to_mem_table(t, dfctx).await?),
"ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t, dfctx).await?),
"parquet" => parquet::to_datafusion_table(t, dfctx).await?,
"xls" | "xlsx" | "xlsb" | "ods" => Arc::new(excel::to_mem_table(t).await?),
"arrow" => Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?),
"arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?),
"mysql" => Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?),

View File

@ -1,272 +0,0 @@
use crate::table::{self, TableSource};
use calamine::{open_workbook, Range, Reader, Xlsx};
use datafusion::arrow::array::{ArrayRef, BooleanArray, PrimitiveArray, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Float64Type, Int64Type, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use snafu::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::vec;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to load XLSX: {msg}"))]
Load { msg: String },
#[snafu(display("Failed to create record batch: {source}"))]
CreateRecordBatch {
source: datafusion::arrow::error::ArrowError,
},
#[snafu(display("Failed to open workbook: {source}"))]
OpenWorkbook { source: calamine::XlsxError },
}
fn infer_value_type(v: &calamine::DataType) -> Result<DataType, Error> {
match v {
calamine::DataType::Int(_) if v.get_int().is_some() => Ok(DataType::Int64),
calamine::DataType::Float(_) if v.get_float().is_some() => Ok(DataType::Float64),
calamine::DataType::Bool(_) if v.get_bool().is_some() => Ok(DataType::Boolean),
calamine::DataType::String(_) if v.get_string().is_some() => Ok(DataType::Utf8),
calamine::DataType::Error(e) => Err(Error::Load { msg: e.to_string() }),
// TODO(upstream): support `Date64`
calamine::DataType::DateTime(_) => Err(Error::Load {
msg: "Unsupported data type: DateTime".to_owned(),
}),
calamine::DataType::Empty => Ok(DataType::Null),
_ => Err(Error::Load {
msg: "Failed to parse the cell value".to_owned(),
}),
}
}
fn infer_schema(r: &Range<calamine::DataType>) -> Result<Schema, Error> {
let mut col_types: HashMap<&str, HashSet<DataType>> = HashMap::new();
let mut rows = r.rows();
let col_names: Result<Vec<&str>, _> = rows
.next()
.unwrap()
.iter()
.enumerate()
.map(|(i, c)| {
c.get_string().ok_or_else(|| Error::Load {
msg: format!("The {i}th column name is empty"),
})
})
.collect();
let col_names = match col_names {
Ok(values) => values,
Err(e) => return Err(e),
};
for row in rows {
for (i, col_val) in row.iter().enumerate() {
let col_name = col_names.get(i).unwrap();
let col_type = infer_value_type(col_val).unwrap();
let entry = col_types.entry(col_name).or_default();
entry.insert(col_type);
}
}
let fields: Vec<Field> = col_names
.iter()
.map(|col_name| {
let set = col_types.entry(col_name).or_insert_with(|| {
let mut set = HashSet::new();
set.insert(DataType::Utf8);
set
});
let mut dt_iter = set.iter().cloned();
let dt = dt_iter.next().unwrap_or(DataType::Utf8);
Field::new(col_name.replace(' ', "_"), dt, true)
})
.collect();
Ok(Schema::new(fields))
}
fn xlsx_sheet_value_to_record_batch(r: Range<calamine::DataType>) -> Result<RecordBatch, Error> {
let schema = infer_schema(&r)?;
let arrays = schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let rows = r.rows().skip(1);
match field.data_type() {
DataType::Boolean => Arc::new(
rows.map(|r| r.get(i).map(|v| v.get_bool().unwrap()))
.collect::<BooleanArray>(),
) as ArrayRef,
DataType::Int64 => Arc::new(
rows.map(|r| r.get(i).map(|v| v.get_int().unwrap()))
.collect::<PrimitiveArray<Int64Type>>(),
) as ArrayRef,
DataType::Float64 => Arc::new(
rows.map(|r| r.get(i).map(|v| v.get_float().unwrap()))
.collect::<PrimitiveArray<Float64Type>>(),
) as ArrayRef,
_ => Arc::new(
rows.map(|r| r.get(i).map(|v| v.get_string().unwrap_or("null")))
.collect::<StringArray>(),
) as ArrayRef,
}
})
.collect::<Vec<ArrayRef>>();
RecordBatch::try_new(Arc::new(schema), arrays).context(CreateRecordBatchSnafu)
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, table::Error> {
let opt = t
.option
.as_ref()
.ok_or(table::Error::MissingOption {})?
.as_xlsx()?;
let uri = t.get_uri_str();
let mut workbook: Xlsx<_> = open_workbook(uri)
.context(OpenWorkbookSnafu)
.context(table::LoadXlsxSnafu)?;
match &opt.sheet_name {
Some(sheet) => {
if let Some(Ok(r)) = workbook.worksheet_range(sheet) {
let batch = xlsx_sheet_value_to_record_batch(r).context(table::LoadXlsxSnafu)?;
let schema_ref = batch.schema();
let partitions = vec![vec![batch]];
Ok(
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.context(table::CreateMemTableSnafu)?,
)
} else {
Err(Error::Load {
msg: "Failed to open .xlsx file.".to_owned(),
})
.context(table::LoadXlsxSnafu)
}
}
None => Err(Error::Load {
msg: "`sheet_name` is not specified".to_owned(),
})
.context(table::LoadXlsxSnafu),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{BooleanArray, Float64Array, Int64Array, StringArray};
use crate::table::TableIoSource;
use crate::test_util::*;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use calamine::{Cell, DataType as XlsxDataType, Range};
fn property_sheet() -> Range<XlsxDataType> {
let cells: Vec<Cell<XlsxDataType>> = vec![
Cell::new((0, 0), XlsxDataType::String("float_column".to_string())),
Cell::new((1, 0), XlsxDataType::Float(1.333)),
Cell::new((2, 0), XlsxDataType::Float(3.333)),
Cell::new((0, 1), XlsxDataType::String("integer_column".to_string())),
Cell::new((1, 1), XlsxDataType::Int(1)),
Cell::new((2, 1), XlsxDataType::Int(3)),
Cell::new((0, 2), XlsxDataType::String("boolean_column".to_string())),
Cell::new((1, 2), XlsxDataType::Bool(true)),
Cell::new((2, 2), XlsxDataType::Bool(false)),
Cell::new((0, 3), XlsxDataType::String("string_column".to_string())),
Cell::new((1, 3), XlsxDataType::String("foo".to_string())),
Cell::new((2, 3), XlsxDataType::String("bar".to_string())),
];
calamine::Range::<calamine::DataType>::from_sparse(cells)
}
#[tokio::test]
async fn load_xlsx_with_toml_config() {
let mut table_source: TableSource = toml::from_str(
r#"
name = "test"
uri = "test_data/uk_cities_with_headers.xlsx"
[option]
format = "xlsx"
sheet_name = "uk_cities_with_headers"
"#,
)
.unwrap();
// patch uri path with the correct test data path
table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx"));
let t = to_mem_table(&table_source).await.unwrap();
let ctx = SessionContext::new();
let stats = t
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
.statistics();
assert_eq!(stats.num_rows, Some(37));
}
#[tokio::test]
async fn load_xlsx_with_yaml_config() {
let mut table_source: TableSource = serde_yaml::from_str(
r#"
name: "test"
uri: "test_data/uk_cities_with_headers.xlsx"
option:
format: "xlsx"
sheet_name: "uk_cities_with_headers"
"#,
)
.unwrap();
// patch uri path with the correct test data path
table_source.io_source = TableIoSource::Uri(test_data_path("uk_cities_with_headers.xlsx"));
let t = to_mem_table(&table_source).await.unwrap();
let ctx = SessionContext::new();
let stats = t
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
.statistics();
assert_eq!(stats.num_rows, Some(37));
}
#[test]
fn schema_interface() {
let sheet = property_sheet();
let schema = infer_schema(&sheet).unwrap();
assert_eq!(
schema,
Schema::new(vec![
Field::new("float_column", DataType::Float64, true),
Field::new("integer_column", DataType::Int64, true),
Field::new("boolean_column", DataType::Boolean, true),
Field::new("string_column", DataType::Utf8, true),
])
);
}
#[test]
fn xlsx_value_to_record_batch() {
let sheet = property_sheet();
let rb = xlsx_sheet_value_to_record_batch(sheet).unwrap();
assert_eq!(rb.num_columns(), 4);
assert_eq!(
rb.column(0).as_ref(),
Arc::new(Float64Array::from(vec![1.333, 3.333])).as_ref(),
);
assert_eq!(
rb.column(1).as_ref(),
Arc::new(Int64Array::from(vec![1, 3])).as_ref(),
);
assert_eq!(
rb.column(2).as_ref(),
Arc::new(BooleanArray::from(vec![true, false])).as_ref(),
);
assert_eq!(
rb.column(3).as_ref(),
Arc::new(StringArray::from(vec!["foo", "bar"])).as_ref(),
);
}
}

BIN
test_data/excel_range.ods Normal file

Binary file not shown.