support loading multiple partitions (#6)

This commit is contained in:
QP Hou 2021-02-21 01:09:39 -08:00 committed by GitHub
parent 2f4646bbb4
commit 2c39fb13e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 336 additions and 132 deletions

60
Cargo.lock generated
View File

@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e"
dependencies = [
"cfg_aliases",
"chrono",
@ -601,6 +601,7 @@ dependencies = [
"serde_json",
"serde_yaml",
"sqlparser 0.7.0",
"tempdir",
"thiserror",
"tokio",
"uriparse",
@ -778,7 +779,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e"
dependencies = [
"ahash 0.7.0",
"arrow",
@ -963,6 +964,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "futures"
version = "0.3.12"
@ -1711,7 +1718,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e"
dependencies = [
"arrow",
"base64 0.12.3",
@ -1880,6 +1887,19 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]
[[package]]
name = "rand"
version = "0.7.3"
@ -1925,6 +1945,21 @@ dependencies = [
"rand_core 0.6.2",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.5.1"
@ -1961,6 +1996,15 @@ dependencies = [
"rand_core 0.6.2",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
@ -2485,6 +2529,16 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tempdir"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8"
dependencies = [
"rand 0.4.6",
"remove_dir_all",
]
[[package]]
name = "tempfile"
version = "3.2.0"

View File

@ -6,9 +6,9 @@ members = [
]
[patch.crates-io]
arrow = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
arrow = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" }
actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }
actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }

View File

@ -242,7 +242,6 @@ Data layer:
- [x] JSON
- [x] parquet
- [ ] xls, xlsx, xlsm, ods: https://github.com/tafia/calamine
- [ ] load multiple partitions from a folder
Misc:
- [ ] auto gen OpenApi doc for rest layer

View File

@ -32,6 +32,7 @@ tokio = "1"
[dev-dependencies]
anyhow = "1"
serde_yaml = "0.8"
tempdir = "0"
[features]
simd = ["arrow/simd", "datafusion/simd"]

View File

@ -1,29 +1,48 @@
use std::fs;
use datafusion::physical_plan::common::build_file_list;
use uriparse::URIReference;
use crate::error::ColumnQError;
use crate::table::TableSource;
pub fn partitions_from_fs_uri<F, T>(
uri: &URIReference,
pub fn partitions_from_fs_uri<'a, F, T>(
t: &'a TableSource,
uri: URIReference<'a>,
mut partition_reader: F,
) -> Result<T, ColumnQError>
) -> Result<Vec<T>, ColumnQError>
where
F: FnMut(std::fs::File) -> Result<T, ColumnQError>,
{
let reader = fs::File::open(uri.path().to_string())
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
partition_reader(reader)
let fs_path = uri.path().to_string();
let mut files = vec![];
let mut file_ext = ".".to_string();
file_ext.push_str(t.extension()?);
build_file_list(&fs_path, &mut files, &file_ext)?;
// TODO: load partitions in parallel
let partitions = files
.iter()
.map(|fpath| {
let reader = fs::File::open(fpath)
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
Ok(partition_reader(reader)?)
})
.collect::<Result<Vec<T>, ColumnQError>>()?;
Ok(partitions)
}
pub async fn partitions_from_http_uri<'a, 'b, F, T>(
uri: &'a URIReference<'b>,
pub async fn partitions_from_http_uri<'a, F, T>(
t: &'a TableSource,
_uri: URIReference<'a>,
mut partition_reader: F,
) -> Result<T, ColumnQError>
) -> Result<Vec<T>, ColumnQError>
where
F: FnMut(std::io::Cursor<bytes::Bytes>) -> Result<T, ColumnQError>,
{
let resp = reqwest::get(&uri.to_string())
let resp = reqwest::get(&t.uri)
.await
.map_err(|e| ColumnQError::HttpStore(e.to_string()))?;
if resp.status().as_u16() / 100 != 2 {
@ -35,5 +54,7 @@ where
let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| {
ColumnQError::HttpStore(format!("Failed to decode server response: {}", e))
})?);
partition_reader(reader)
// HTTP store doesn't support directory listing, so we always only return a single partition
Ok(vec![partition_reader(reader)?])
}

View File

@ -3,23 +3,25 @@ extern crate lazy_static;
pub mod error;
macro_rules! with_reader_from_uri {
($call_with_r:expr, $uri:ident) => {
match $uri.scheme() {
macro_rules! partitions_from_table_source {
($table_source:ident, $call_with_r:expr) => {{
let uri = $table_source.parsed_uri()?;
match uri.scheme() {
// default to local file when schema is not provided
None | Some(uriparse::Scheme::FileSystem) => {
crate::io::partitions_from_fs_uri(&$uri, $call_with_r)
crate::io::partitions_from_fs_uri(&$table_source, uri, $call_with_r)
}
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => {
crate::io::partitions_from_http_uri(&$uri, $call_with_r).await
crate::io::partitions_from_http_uri(&$table_source, uri, $call_with_r).await
}
// "s3" => {}
_ => Err(ColumnQError::InvalidUri(format!(
"Unsupported scheme in table uri: {:?}",
$uri
$table_source.uri,
))),
}
};
}};
}
pub mod columnq;

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::error::ColumnQError;
@ -14,26 +15,30 @@ pub async fn to_mem_table(
let batch_size = 1024;
let projection = None;
let uri = t.parsed_uri()?;
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let schema = with_reader_from_uri!(
|mut r| {
Ok(arrow::csv::reader::infer_schema_from_reader(
&mut r, delimiter, None, has_header,
)?
.0)
},
uri
)?;
Arc::new(schema)
let schemas = partitions_from_table_source!(t, |mut r| {
let (schema, record_count) = arrow::csv::reader::infer_schema_from_reader(
&mut r, delimiter, None, has_header,
)?;
if record_count > 0 {
Ok(Some(schema))
} else {
Ok(None)
}
})?
.into_iter()
.filter_map(|e| e)
.collect::<Vec<_>>();
Arc::new(Schema::try_merge(schemas)?)
}
};
let batches: Vec<RecordBatch> = with_reader_from_uri!(
|r| -> Result<Vec<RecordBatch>, ColumnQError> {
let partitions: Vec<Vec<RecordBatch>> =
partitions_from_table_source!(t, |r| -> Result<Vec<RecordBatch>, ColumnQError> {
let csv_reader = arrow::csv::Reader::new(
r,
schema_ref.clone(),
@ -48,12 +53,44 @@ pub async fn to_mem_table(
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
},
uri
)?;
})?;
let partitions = vec![batches];
Ok(datafusion::datasource::MemTable::try_new(
schema_ref, partitions,
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::test_util::*;
#[tokio::test]
async fn load_partitions() -> anyhow::Result<()> {
let tmp_dir = tempdir::TempDir::new("columnq.test.csv_partitions")?;
let tmp_dir_path = tmp_dir.path();
let source_path = test_data_path("uk_cities_with_headers.csv");
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.csv"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.csv"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.csv"))? > 0);
let t = to_mem_table(&TableSource {
name: "uk_cities".to_string(),
uri: tmp_dir_path.to_string_lossy().to_string(),
schema: None,
option: Some(TableLoadOption::csv {}),
})
.await?;
assert_eq!(t.statistics().num_rows, Some(37 * 3));
Ok(())
}
}

View File

@ -1,23 +1,22 @@
use std::io::Read;
use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use serde_json::value::Value;
use uriparse::URIReference;
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableSource};
use crate::table::{TableLoadOption, TableSchema, TableSource};
fn json_value_from_reader<R: Read>(r: R) -> Result<Value, ColumnQError> {
serde_json::from_reader(r).map_err(ColumnQError::json_parse)
}
async fn load_array_by_path<'a>(
uri: URIReference<'a>,
fn json_partition_to_vec<'a>(
json_partition: &Value,
pointer: Option<&'a str>,
) -> Result<Vec<Value>, ColumnQError> {
let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?;
let mut value_ref: &Value = &payload;
let mut value_ref = json_partition;
if let Some(p) = pointer {
match value_ref.pointer(p) {
@ -40,6 +39,67 @@ async fn load_array_by_path<'a>(
}
}
fn json_vec_to_partition(
json_rows: Vec<Value>,
provided_schema: &Option<TableSchema>,
batch_size: usize,
array_encoded: bool,
) -> Result<(arrow::datatypes::Schema, Vec<RecordBatch>), ColumnQError> {
// load schema
let schema = match provided_schema {
Some(s) => s.into(),
None => arrow::json::reader::infer_json_schema_from_iterator(
json_rows.iter().map(|v| Ok(v.clone())),
)
.map_err(|e| {
ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {}", e))
})?,
};
// decode to arrow record batch
let decoder = arrow::json::reader::Decoder::new(Arc::new(schema.clone()), batch_size, None);
let batch = {
// enclose values_iter in its own scope so it won't brrow schema_ref til end of this
// function
let mut values_iter: Box<dyn Iterator<Item = arrow::error::Result<Value>>>;
values_iter = if array_encoded {
// convert row array to object based on schema
// TODO: support array_encoded read in upstream arrow json reader instead
Box::new(json_rows.into_iter().map(|json_row| {
let mut m = serde_json::map::Map::new();
schema.fields().iter().enumerate().try_for_each(|(i, f)| {
match json_row.get(i) {
Some(x) => {
m.insert(f.name().to_string(), x.clone());
Ok(())
}
None => Err(arrow::error::ArrowError::JsonError(format!(
"arry encoded JSON row missing column {:?} : {:?}",
i, json_row
))),
}
})?;
Ok(Value::Object(m))
}))
} else {
// no need to convert row since each row is already an object
Box::new(json_rows.into_iter().map(Ok))
};
// decode whole array into single record batch
decoder
.next_batch(&mut values_iter)
.map_err(|e| {
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e))
})?
.ok_or_else(|| {
ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string())
})?
};
Ok((schema, vec![batch]))
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
@ -61,82 +121,47 @@ pub async fn to_mem_table(
_ => None,
};
// load array from file
let json_rows = load_array_by_path(t.parsed_uri()?, pointer.as_deref()).await?;
let mut merged_schema: Option<Schema> = None;
let json_partitions: Vec<Value> = partitions_from_table_source!(t, json_value_from_reader)?;
if json_rows.is_empty() {
match pointer {
Some(p) => {
return Err(ColumnQError::LoadJson(format!(
"{} points to an emtpy array",
p
)));
let partitions = json_partitions
.iter()
.map(|json_partition| {
let json_rows = json_partition_to_vec(json_partition, pointer.as_deref())?;
if json_rows.is_empty() {
match &pointer {
Some(p) => {
return Err(ColumnQError::LoadJson(format!(
"{} points to an emtpy array",
p
)));
}
None => {
return Err(ColumnQError::LoadJson(
"JSON data is an emtpy array".to_string(),
));
}
}
}
None => {
return Err(ColumnQError::LoadJson(
"JSON data is an emtpy array".to_string(),
));
}
}
}
// load schema
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),
None => arrow::json::reader::infer_json_schema_from_iterator(
json_rows.iter().map(|v| Ok(v.clone())),
)
.map_err(|e| {
ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {}", e))
})?,
};
let (batch_schema, partition) =
json_vec_to_partition(json_rows, &t.schema, batch_size, array_encoded)?;
// decode to arrow record batch
let decoder = arrow::json::reader::Decoder::new(schema_ref.clone(), batch_size, None);
let batch = {
// enclose values_iter in its own scope so it won't brrow schema_ref til end of this
// function
let mut values_iter: Box<dyn Iterator<Item = arrow::error::Result<Value>>>;
values_iter = if array_encoded {
// convert row array to object based on schema
// TODO: support array_encoded read in arrow json reader instead
Box::new(json_rows.into_iter().map(|json_row| {
let mut m = serde_json::map::Map::new();
schema_ref
.fields()
.iter()
.enumerate()
.try_for_each(|(i, f)| match json_row.get(i) {
Some(x) => {
m.insert(f.name().to_string(), x.clone());
Ok(())
}
None => Err(arrow::error::ArrowError::JsonError(format!(
"arry encoded JSON row missing column {:?} : {:?}",
i, json_row
))),
})?;
Ok(Value::Object(m))
}))
} else {
// no need to convert row since each row is already an object
Box::new(json_rows.into_iter().map(Ok))
};
merged_schema = Some(match &merged_schema {
Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema])?,
_ => batch_schema,
});
// decode whole array into single record batch
decoder
.next_batch(&mut values_iter)
.map_err(|e| {
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e))
})?
.ok_or_else(|| {
ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string())
})?
};
let partitions = vec![vec![batch]];
Ok(partition)
})
.collect::<Result<Vec<Vec<RecordBatch>>, ColumnQError>>()?;
Ok(datafusion::datasource::MemTable::try_new(
schema_ref, partitions,
Arc::new(
merged_schema
.ok_or_else(|| ColumnQError::LoadJson("failed to load schema".to_string()))?,
),
partitions,
)?)
}
@ -194,7 +219,7 @@ mod tests {
];
expected_obj_keys.sort();
assert_eq!(obj_keys, expected_obj_keys,);
assert_eq!(obj_keys, expected_obj_keys);
Ok(())
}

View File

@ -69,6 +69,15 @@ impl TableLoadOption {
)),
}
}
pub fn extension<'a>(&'a self) -> &'static str {
match self {
TableLoadOption::json { .. } => "json",
TableLoadOption::csv { .. } => "csv",
TableLoadOption::parquet { .. } => "parquet",
TableLoadOption::google_spreadsheet(_) => "gsheet",
}
}
}
#[derive(Deserialize, Clone)]
@ -82,6 +91,7 @@ pub struct TableSource {
impl TableSource {
pub fn new(name: String, uri: String) -> Self {
// TODO: parse table format from uri during initializeion?
Self {
name,
uri,
@ -94,10 +104,36 @@ impl TableSource {
URIReference::try_from(self.uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(self.uri.clone()))
}
pub fn extension<'a>(&'a self) -> Result<&'a str, ColumnQError> {
Ok(match &self.option {
Some(opt) => opt.extension(),
None => {
let ext = Path::new(&self.uri)
.extension()
.and_then(OsStr::to_str)
.ok_or_else(|| {
ColumnQError::InvalidUri(format!(
"cannot detect table extension from uri: {}",
self.uri
))
})?;
match ext {
"csv" | "json" | "parquet" => ext,
_ => {
return Err(ColumnQError::InvalidUri(format!(
"unsupported extension in uri: {}",
self.uri
)));
}
}
}
})
}
}
pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: support reading list of files within directory
if let Some(opt) = &t.option {
return Ok(match opt {
TableLoadOption::json { .. } => json::to_mem_table(t).await?,

View File

@ -18,10 +18,8 @@ pub async fn to_mem_table(
let mut schema: Option<Schema> = None;
let uri = t.parsed_uri()?;
let partition: Vec<RecordBatch> = with_reader_from_uri!(
|mut r| -> Result<Vec<RecordBatch>, ColumnQError> {
let partitions: Vec<Vec<RecordBatch>> =
partitions_from_table_source!(t, |mut r| -> Result<Vec<RecordBatch>, ColumnQError> {
// TODO: this is very inefficient, we are copying the parquet data in memory twice when
// it's being fetched from http store
let mut buffer = Vec::new();
@ -41,35 +39,35 @@ pub async fn to_mem_table(
ColumnQError::LoadParquet("failed to load schema from partition".to_string())
})?;
schema = Some(match &schema {
Some(s) => Schema::try_merge(vec![s.clone(), batch_schema])?,
None => batch_schema,
Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema])?,
_ => batch_schema,
});
Ok(record_batch_reader
.into_iter()
.collect::<arrow::error::Result<Vec<RecordBatch>>>()?)
},
uri
)?;
})?;
Ok(datafusion::datasource::MemTable::try_new(
Arc::new(
schema.ok_or_else(|| ColumnQError::LoadParquet("failed to load schema".to_string()))?,
),
vec![partition],
partitions,
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::test_util::*;
#[tokio::test]
async fn simple_parquet_load() -> Result<(), ColumnQError> {
async fn load_simple_parquet() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource {
name: "blogs".to_string(),
uri: test_data_path("blogs.parquet"),
@ -91,4 +89,35 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn load_partitions() -> anyhow::Result<()> {
let tmp_dir = tempdir::TempDir::new("columnq.test.parquet_partitions")?;
let tmp_dir_path = tmp_dir.path();
let source_path = test_data_path("blogs.parquet");
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.parquet"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.parquet"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.parquet"))? > 0);
let t = to_mem_table(&TableSource {
name: "blogs".to_string(),
uri: tmp_dir_path.to_string_lossy().to_string(),
schema: None,
option: Some(TableLoadOption::parquet {}),
})
.await?;
assert_eq!(
t.schema()
.metadata()
.get("writer.model.name")
.map(|s| s.as_str()),
Some("protobuf")
);
assert_eq!(t.statistics().num_rows, Some(1500));
Ok(())
}
}