feat: support https transport for csv data source (#4)

close #1
This commit is contained in:
QP Hou 2021-02-20 01:47:11 -08:00 committed by GitHub
parent 0c61bba493
commit f6dfc1faaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 49 deletions

6
Cargo.lock generated
View File

@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
dependencies = [
"cfg_aliases",
"chrono",
@ -778,7 +778,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
dependencies = [
"ahash 0.7.0",
"arrow",
@ -1711,7 +1711,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=bb6f8e1448f4065c54938b9af92adcf1adeffc47#bb6f8e1448f4065c54938b9af92adcf1adeffc47"
source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44"
dependencies = [
"arrow",
"base64 0.12.3",

View File

@ -6,9 +6,9 @@ members = [
]
[patch.crates-io]
arrow = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "bb6f8e1448f4065c54938b9af92adcf1adeffc47" }
arrow = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
parquet = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
datafusion = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" }
actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }
actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" }

39
columnq/src/io/mod.rs Normal file
View File

@ -0,0 +1,39 @@
use std::fs;
use uriparse::URIReference;
use crate::error::ColumnQError;
pub fn partitions_from_fs_uri<F, T>(
uri: &URIReference,
partition_reader: F,
) -> Result<T, ColumnQError>
where
F: Fn(std::fs::File) -> Result<T, ColumnQError>,
{
let reader = fs::File::open(uri.path().to_string())
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
partition_reader(reader)
}
pub async fn partitions_from_http_uri<'a, 'b, F, T>(
uri: &'a URIReference<'b>,
partition_reader: F,
) -> Result<T, ColumnQError>
where
F: Fn(std::io::Cursor<bytes::Bytes>) -> Result<T, ColumnQError>,
{
let resp = reqwest::get(&uri.to_string())
.await
.map_err(|e| ColumnQError::HttpStore(e.to_string()))?;
if resp.status().as_u16() / 100 != 2 {
return Err(ColumnQError::HttpStore(format!(
"Invalid response from server: {:?}",
resp
)));
}
let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| {
ColumnQError::HttpStore(format!("Failed to decode server response: {}", e))
})?);
partition_reader(reader)
}

View File

@ -8,24 +8,10 @@ macro_rules! with_reader_from_uri {
match $uri.scheme() {
// default to local file when schema is not provided
None | Some(Scheme::FileSystem) => {
let reader = fs::File::open($uri.path().to_string())
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
$call_with_r(reader).map_err(ColumnQError::json_parse)
crate::io::partitions_from_fs_uri(&$uri, $call_with_r)
}
Some(Scheme::HTTP) | Some(Scheme::HTTPS) => {
let resp = reqwest::get(&$uri.to_string())
.await
.map_err(|e| ColumnQError::HttpStore(e.to_string()))?;
if resp.status().as_u16() / 100 != 2 {
return Err(ColumnQError::HttpStore(format!(
"Invalid response from server: {:?}",
resp
)));
}
let reader = Cursor::new(resp.bytes().await.map_err(|e| {
ColumnQError::HttpStore(format!("Failed to decode server response: {}", e))
})?);
$call_with_r(reader).map_err(ColumnQError::json_parse)
crate::io::partitions_from_http_uri(&$uri, $call_with_r).await
}
// "s3" => {}
_ => Err(ColumnQError::InvalidUri(format!(
@ -37,6 +23,7 @@ macro_rules! with_reader_from_uri {
}
pub mod columnq;
pub mod io;
pub mod query;
pub mod table;

View File

@ -1,41 +1,59 @@
use std::fs;
use std::convert::TryFrom;
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use uriparse::{Scheme, URIReference};
use crate::error::ColumnQError;
use crate::table::TableSource;
pub fn to_mem_table(t: &TableSource) -> Result<datafusion::datasource::MemTable, ColumnQError> {
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: read csv option from config
let has_header = true;
let delimiter = b',';
let batch_size = 1024;
let projection = None;
let uri = URIReference::try_from(t.uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(t.uri.clone()))?;
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),
None => Arc::new(arrow::csv::reader::infer_schema_from_files(
&[t.uri.clone()],
delimiter,
None,
has_header,
)?),
None => {
let schema = with_reader_from_uri!(
|mut r| {
Ok(arrow::csv::reader::infer_schema_from_reader(
&mut r, delimiter, None, has_header,
)?
.0)
},
uri
)?;
Arc::new(schema)
}
};
let csv_reader = arrow::csv::Reader::new(
fs::File::open(&t.uri)
.map_err(|e| ColumnQError::LoadCsv(format!("open file error: {}", e)))?,
schema_ref.clone(),
has_header,
Some(delimiter),
batch_size,
None,
projection,
);
let batches: Vec<RecordBatch> = with_reader_from_uri!(
|r| -> Result<Vec<RecordBatch>, ColumnQError> {
let csv_reader = arrow::csv::Reader::new(
r,
schema_ref.clone(),
has_header,
Some(delimiter),
batch_size,
None,
projection.clone(),
);
let batches = csv_reader
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<arrow::record_batch::RecordBatch>, ColumnQError>>()?;
csv_reader
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
},
uri
)?;
let partitions = vec![batches];
Ok(datafusion::datasource::MemTable::try_new(

View File

@ -1,6 +1,5 @@
use std::convert::TryFrom;
use std::fs;
use std::io::Cursor;
use std::io::Read;
use std::sync::Arc;
use serde_json::value::Value;
@ -9,6 +8,10 @@ use uriparse::{Scheme, URIReference};
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableSource};
fn json_value_from_reader<R: Read>(r: R) -> Result<Value, ColumnQError> {
serde_json::from_reader(r).map_err(ColumnQError::json_parse)
}
async fn load_array_by_path(
uri_s: &str,
pointer: Option<&str>,
@ -16,7 +19,7 @@ async fn load_array_by_path(
let uri =
URIReference::try_from(uri_s).map_err(|_| ColumnQError::InvalidUri(uri_s.to_string()))?;
let payload: Value = with_reader_from_uri!(serde_json::from_reader, uri)?;
let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?;
let mut value_ref: &Value = &payload;

View File

@ -94,7 +94,7 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
if let Some(opt) = &t.option {
return Ok(match opt {
TableLoadOption::json { .. } => json::to_mem_table(t).await?,
TableLoadOption::csv { .. } => csv::to_mem_table(t)?,
TableLoadOption::csv { .. } => csv::to_mem_table(t).await?,
TableLoadOption::parquet { .. } => parquet::to_mem_table(t)?,
TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?,
});
@ -103,7 +103,7 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
// no format specified explictly, try to guess from file path
Ok(
match Path::new(&t.uri).extension().and_then(OsStr::to_str) {
Some("csv") => csv::to_mem_table(t)?,
Some("csv") => csv::to_mem_table(t).await?,
Some("json") => json::to_mem_table(t).await?,
Some("parquet") => parquet::to_mem_table(t)?,
Some(ext) => {