feat: support http store for parquet data source (#5)

This commit is contained in:
QP Hou 2021-02-20 14:42:22 -08:00 committed by GitHub
parent f6dfc1faaf
commit 2f4646bbb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 106 additions and 42 deletions

6
Cargo.lock generated
View File

@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
dependencies = [
"cfg_aliases",
"chrono",
@ -778,7 +778,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
dependencies = [
"ahash 0.7.0",
"arrow",
@ -1711,7 +1711,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb"
dependencies = [
"arrow",
"base64 0.12.3",

View File

@ -6,9 +6,9 @@ members = [
]
[patch.crates-io]
arrow = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
arrow = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" }
actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }
actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }

View File

@ -6,10 +6,10 @@ use crate::error::ColumnQError;
pub fn partitions_from_fs_uri<F, T>(
uri: &URIReference,
partition_reader: F,
mut partition_reader: F,
) -> Result<T, ColumnQError>
where
F: Fn(std::fs::File) -> Result<T, ColumnQError>,
F: FnMut(std::fs::File) -> Result<T, ColumnQError>,
{
let reader = fs::File::open(uri.path().to_string())
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
@ -18,10 +18,10 @@ where
pub async fn partitions_from_http_uri<'a, 'b, F, T>(
uri: &'a URIReference<'b>,
partition_reader: F,
mut partition_reader: F,
) -> Result<T, ColumnQError>
where
F: Fn(std::io::Cursor<bytes::Bytes>) -> Result<T, ColumnQError>,
F: FnMut(std::io::Cursor<bytes::Bytes>) -> Result<T, ColumnQError>,
{
let resp = reqwest::get(&uri.to_string())
.await

View File

@ -7,10 +7,10 @@ macro_rules! with_reader_from_uri {
($call_with_r:expr, $uri:ident) => {
match $uri.scheme() {
// default to local file when schema is not provided
None | Some(Scheme::FileSystem) => {
None | Some(uriparse::Scheme::FileSystem) => {
crate::io::partitions_from_fs_uri(&$uri, $call_with_r)
}
Some(Scheme::HTTP) | Some(Scheme::HTTPS) => {
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => {
crate::io::partitions_from_http_uri(&$uri, $call_with_r).await
}
// "s3" => {}

View File

@ -1,8 +1,6 @@
use std::convert::TryFrom;
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use uriparse::{Scheme, URIReference};
use crate::error::ColumnQError;
use crate::table::TableSource;
@ -16,8 +14,7 @@ pub async fn to_mem_table(
let batch_size = 1024;
let projection = None;
let uri = URIReference::try_from(t.uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(t.uri.clone()))?;
let uri = t.parsed_uri()?;
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),

View File

@ -1,9 +1,8 @@
use std::convert::TryFrom;
use std::io::Read;
use std::sync::Arc;
use serde_json::value::Value;
use uriparse::{Scheme, URIReference};
use uriparse::URIReference;
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableSource};
@ -12,13 +11,10 @@ fn json_value_from_reader<R: Read>(r: R) -> Result<Value, ColumnQError> {
serde_json::from_reader(r).map_err(ColumnQError::json_parse)
}
async fn load_array_by_path(
uri_s: &str,
pointer: Option<&str>,
async fn load_array_by_path<'a>(
uri: URIReference<'a>,
pointer: Option<&'a str>,
) -> Result<Vec<Value>, ColumnQError> {
let uri =
URIReference::try_from(uri_s).map_err(|_| ColumnQError::InvalidUri(uri_s.to_string()))?;
let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?;
let mut value_ref: &Value = &payload;
@ -47,6 +43,7 @@ async fn load_array_by_path(
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let array_encoded = match &t.option {
Some(TableLoadOption::json { array_encoded, .. }) => array_encoded.unwrap_or(false),
@ -65,7 +62,7 @@ pub async fn to_mem_table(
};
// load array from file
let json_rows = load_array_by_path(&t.uri, pointer.as_deref()).await?;
let json_rows = load_array_by_path(t.parsed_uri()?, pointer.as_deref()).await?;
if json_rows.is_empty() {
match pointer {
@ -152,7 +149,7 @@ mod tests {
use crate::test_util::*;
#[tokio::test]
async fn test_nested_struct_and_lists() -> Result<(), ColumnQError> {
async fn nested_struct_and_lists() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource {
name: "spacex_launches".to_string(),
uri: test_data_path("spacex-launches.json"),

View File

@ -1,7 +1,9 @@
use std::convert::TryFrom;
use std::ffi::OsStr;
use std::path::Path;
use serde_derive::Deserialize;
use uriparse::URIReference;
use crate::error::ColumnQError;
@ -87,6 +89,11 @@ impl TableSource {
option: None,
}
}
pub fn parsed_uri(&self) -> Result<URIReference, ColumnQError> {
URIReference::try_from(self.uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(self.uri.clone()))
}
}
pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, ColumnQError> {
@ -95,7 +102,7 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
return Ok(match opt {
TableLoadOption::json { .. } => json::to_mem_table(t).await?,
TableLoadOption::csv { .. } => csv::to_mem_table(t).await?,
TableLoadOption::parquet { .. } => parquet::to_mem_table(t)?,
TableLoadOption::parquet { .. } => parquet::to_mem_table(t).await?,
TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?,
});
}
@ -105,7 +112,7 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
match Path::new(&t.uri).extension().and_then(OsStr::to_str) {
Some("csv") => csv::to_mem_table(t).await?,
Some("json") => json::to_mem_table(t).await?,
Some("parquet") => parquet::to_mem_table(t)?,
Some("parquet") => parquet::to_mem_table(t).await?,
Some(ext) => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",

View File

@ -1,31 +1,94 @@
use std::fs::File;
use std::io::Read;
use std::sync::Arc;
use arrow::record_batch::RecordBatchReader;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::SerializedFileReader;
use parquet::file::serialized_reader::SliceableCursor;
use crate::error::ColumnQError;
use crate::table::TableSource;
pub fn to_mem_table(t: &TableSource) -> Result<datafusion::datasource::MemTable, ColumnQError> {
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
let file = File::open(&t.uri).map_err(ColumnQError::open_parquet_file)?;
let file_reader = SerializedFileReader::new(file).map_err(ColumnQError::parquet_file_reader)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let mut schema: Option<Schema> = None;
let record_batch_reader = arrow_reader
.get_record_reader(batch_size)
.map_err(ColumnQError::parquet_record_reader)?;
let schema_ref = record_batch_reader.schema();
let uri = t.parsed_uri()?;
let partition = record_batch_reader
.into_iter()
.collect::<arrow::error::Result<Vec<arrow::record_batch::RecordBatch>>>()?;
let partition: Vec<RecordBatch> = with_reader_from_uri!(
|mut r| -> Result<Vec<RecordBatch>, ColumnQError> {
// TODO: this is very inefficient, we are copying the parquet data in memory twice when
// it's being fetched from http store
let mut buffer = Vec::new();
r.read_to_end(&mut buffer).map_err(|_| {
ColumnQError::LoadParquet("failed to copy parquet data in memory".to_string())
})?;
let file_reader = SerializedFileReader::new(SliceableCursor::new(buffer))
.map_err(ColumnQError::parquet_file_reader)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader = arrow_reader
.get_record_reader(batch_size)
.map_err(ColumnQError::parquet_record_reader)?;
let batch_schema = arrow_reader.get_schema().map_err(|_| {
ColumnQError::LoadParquet("failed to load schema from partition".to_string())
})?;
schema = Some(match &schema {
Some(s) => Schema::try_merge(vec![s.clone(), batch_schema])?,
None => batch_schema,
});
Ok(record_batch_reader
.into_iter()
.collect::<arrow::error::Result<Vec<RecordBatch>>>()?)
},
uri
)?;
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
Arc::new(
schema.ok_or_else(|| ColumnQError::LoadParquet("failed to load schema".to_string()))?,
),
vec![partition],
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::datasource::TableProvider;
use crate::test_util::*;
#[tokio::test]
async fn simple_parquet_load() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource {
name: "blogs".to_string(),
uri: test_data_path("blogs.parquet"),
schema: None,
option: None,
})
.await?;
let schema = t.schema();
assert_eq!(
schema
.metadata()
.get("writer.model.name")
.map(|s| s.as_str()),
Some("protobuf")
);
assert_eq!(t.statistics().num_rows, Some(500));
Ok(())
}
}