From f6dfc1faaf7362d5dd16509d051f30a4b4db50e1 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sat, 20 Feb 2021 01:47:11 -0800 Subject: [PATCH] feat: support https transport for csv data source (#4) close #1 --- Cargo.lock | 6 ++-- Cargo.toml | 6 ++-- columnq/src/io/mod.rs | 39 ++++++++++++++++++++++++ columnq/src/lib.rs | 19 ++---------- columnq/src/table/csv.rs | 62 +++++++++++++++++++++++++-------------- columnq/src/table/json.rs | 9 ++++-- columnq/src/table/mod.rs | 4 +-- 7 files changed, 96 insertions(+), 49 deletions(-) create mode 100644 columnq/src/io/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 8d331e8..10ea9ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47" +source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" dependencies = [ "cfg_aliases", "chrono", @@ -778,7 +778,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47" +source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" dependencies = [ "ahash 0.7.0", "arrow", @@ -1711,7 +1711,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47" +source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" dependencies = [ "arrow", "base64 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index 71f8bc2..acc2029 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,9 @@ members = [ ] [patch.crates-io] -arrow = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" } -parquet = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" } -datafusion = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" } +arrow = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } +parquet = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } +datafusion = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs new file mode 100644 index 0000000..46784a5 --- /dev/null +++ b/columnq/src/io/mod.rs @@ -0,0 +1,39 @@ +use std::fs; + +use uriparse::URIReference; + +use crate::error::ColumnQError; + +pub fn partitions_from_fs_uri( + uri: &URIReference, + partition_reader: F, +) -> Result +where + F: Fn(std::fs::File) -> Result, +{ + let reader = fs::File::open(uri.path().to_string()) + .map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?; + partition_reader(reader) +} + +pub async fn partitions_from_http_uri<'a, 'b, F, T>( + uri: &'a URIReference<'b>, + partition_reader: F, +) -> Result +where + F: Fn(std::io::Cursor) -> Result, +{ + let resp = reqwest::get(&uri.to_string()) + .await + .map_err(|e| ColumnQError::HttpStore(e.to_string()))?; + if resp.status().as_u16() / 100 != 2 { + return Err(ColumnQError::HttpStore(format!( + "Invalid response from server: {:?}", + resp + ))); + } + let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| { + ColumnQError::HttpStore(format!("Failed to decode server response: {}", e)) + })?); + partition_reader(reader) +} diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index ed32de1..66745e9 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -8,24 +8,10 @@ macro_rules! with_reader_from_uri { match $uri.scheme() { // default to local file when schema is not provided None | Some(Scheme::FileSystem) => { - let reader = fs::File::open($uri.path().to_string()) - .map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?; - $call_with_r(reader).map_err(ColumnQError::json_parse) + crate::io::partitions_from_fs_uri(&$uri, $call_with_r) } Some(Scheme::HTTP) | Some(Scheme::HTTPS) => { - let resp = reqwest::get(&$uri.to_string()) - .await - .map_err(|e| ColumnQError::HttpStore(e.to_string()))?; - if resp.status().as_u16() / 100 != 2 { - return Err(ColumnQError::HttpStore(format!( - "Invalid response from server: {:?}", - resp - ))); - } - let reader = Cursor::new(resp.bytes().await.map_err(|e| { - ColumnQError::HttpStore(format!("Failed to decode server response: {}", e)) - })?); - $call_with_r(reader).map_err(ColumnQError::json_parse) + crate::io::partitions_from_http_uri(&$uri, $call_with_r).await } // "s3" => {} _ => Err(ColumnQError::InvalidUri(format!( @@ -37,6 +23,7 @@ macro_rules! with_reader_from_uri { } pub mod columnq; +pub mod io; pub mod query; pub mod table; diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index a24f252..da82af8 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -1,41 +1,59 @@ -use std::fs; +use std::convert::TryFrom; use std::sync::Arc; +use arrow::record_batch::RecordBatch; +use uriparse::{Scheme, URIReference}; + use crate::error::ColumnQError; use crate::table::TableSource; -pub fn to_mem_table(t: &TableSource) -> Result { +pub async fn to_mem_table( + t: &TableSource, +) -> Result { // TODO: read csv option from config let has_header = true; let delimiter = b','; let batch_size = 1024; let projection = None; + let uri = URIReference::try_from(t.uri.as_str()) + .map_err(|_| ColumnQError::InvalidUri(t.uri.clone()))?; + let schema_ref: arrow::datatypes::SchemaRef = match &t.schema { Some(s) => Arc::new(s.into()), - None => Arc::new(arrow::csv::reader::infer_schema_from_files( - &[t.uri.clone()], - delimiter, - None, - has_header, - )?), + None => { + let schema = with_reader_from_uri!( + |mut r| { + Ok(arrow::csv::reader::infer_schema_from_reader( + &mut r, delimiter, None, has_header, + )? + .0) + }, + uri + )?; + Arc::new(schema) + } }; - let csv_reader = arrow::csv::Reader::new( - fs::File::open(&t.uri) - .map_err(|e| ColumnQError::LoadCsv(format!("open file error: {}", e)))?, - schema_ref.clone(), - has_header, - Some(delimiter), - batch_size, - None, - projection, - ); + let batches: Vec = with_reader_from_uri!( + |r| -> Result, ColumnQError> { + let csv_reader = arrow::csv::Reader::new( + r, + schema_ref.clone(), + has_header, + Some(delimiter), + batch_size, + None, + projection.clone(), + ); - let batches = csv_reader - .into_iter() - .map(|batch| Ok(batch?)) - .collect::, ColumnQError>>()?; + csv_reader + .into_iter() + .map(|batch| Ok(batch?)) + .collect::, ColumnQError>>() + }, + uri + )?; let partitions = vec![batches]; Ok(datafusion::datasource::MemTable::try_new( diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index 0319d79..8c5e735 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -1,6 +1,5 @@ use std::convert::TryFrom; -use std::fs; -use std::io::Cursor; +use std::io::Read; use std::sync::Arc; use serde_json::value::Value; @@ -9,6 +8,10 @@ use uriparse::{Scheme, URIReference}; use crate::error::ColumnQError; use crate::table::{TableLoadOption, TableSource}; +fn json_value_from_reader(r: R) -> Result { + serde_json::from_reader(r).map_err(ColumnQError::json_parse) +} + async fn load_array_by_path( uri_s: &str, pointer: Option<&str>, @@ -16,7 +19,7 @@ async fn load_array_by_path( let uri = URIReference::try_from(uri_s).map_err(|_| ColumnQError::InvalidUri(uri_s.to_string()))?; - let payload: Value = with_reader_from_uri!(serde_json::from_reader, uri)?; + let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?; let mut value_ref: &Value = &payload; diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index c0ef5eb..d1213ed 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -94,7 +94,7 @@ pub async fn load(t: &TableSource) -> Result json::to_mem_table(t).await?, - TableLoadOption::csv { .. } => csv::to_mem_table(t)?, + TableLoadOption::csv { .. } => csv::to_mem_table(t).await?, TableLoadOption::parquet { .. } => parquet::to_mem_table(t)?, TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?, }); @@ -103,7 +103,7 @@ pub async fn load(t: &TableSource) -> Result csv::to_mem_table(t)?, + Some("csv") => csv::to_mem_table(t).await?, Some("json") => json::to_mem_table(t).await?, Some("parquet") => parquet::to_mem_table(t)?, Some(ext) => {