fix clippy error

This commit is contained in:
Qingping Hou 2025-06-15 20:17:19 -07:00
parent 271ddb8763
commit d33c80a9e5
12 changed files with 160 additions and 116 deletions

View File

@ -32,6 +32,7 @@ pub async fn to_mem_table(
|mut r| {
let arrow_file_reader = arrow::ipc::reader::FileReader::try_new(&mut r, None)
.context(NewReaderSnafu)
.map_err(Box::new)
.context(table::LoadArrowIpcFileSnafu)?;
let schema = (*arrow_file_reader.schema()).clone();
@ -40,6 +41,7 @@ pub async fn to_mem_table(
.collect::<Result<Vec<RecordBatch>, _>>()
.map(|batches| (Some(schema), batches))
.context(CollectRecordBatchSnafu)
.map_err(Box::new)
.context(table::LoadArrowIpcFileSnafu)
},
dfctx
@ -57,6 +59,7 @@ pub async fn to_mem_table(
.flat_map(|v| if !(v.1).is_empty() { v.0.take() } else { None })
.collect::<Vec<_>>(),
)
.map_err(Box::new)
.context(table::MergeSchemaSnafu)?,
)
}
@ -69,6 +72,7 @@ pub async fn to_mem_table(
.map(|v| v.1)
.collect::<Vec<Vec<RecordBatch>>>(),
)
.map_err(Box::new)
.context(table::CreateMemTableSnafu)
}

View File

@ -32,7 +32,7 @@ pub async fn to_mem_table(
|mut r| {
let arrow_stream_reader = arrow::ipc::reader::StreamReader::try_new(&mut r, None)
.context(NewReaderSnafu)
.context(table::LoadArrowIpcSnafu)?;
.map_err(Box::new).context(table::LoadArrowIpcSnafu)?;
let schema = (*arrow_stream_reader.schema()).clone();
arrow_stream_reader
@ -40,7 +40,7 @@ pub async fn to_mem_table(
.collect::<Result<Vec<RecordBatch>, _>>()
.map(|batches| (Some(schema), batches))
.context(CollectRecordBatchSnafu)
.context(table::LoadArrowIpcSnafu)
.map_err(Box::new).context(table::LoadArrowIpcSnafu)
},
dfctx
)
@ -57,7 +57,7 @@ pub async fn to_mem_table(
.flat_map(|v| if !(v.1).is_empty() { v.0.take() } else { None })
.collect::<Vec<_>>(),
)
.context(table::MergeSchemaSnafu)?,
.map_err(Box::new).context(table::MergeSchemaSnafu)?,
)
}
};
@ -69,7 +69,7 @@ pub async fn to_mem_table(
.map(|v| v.1)
.collect::<Vec<Vec<RecordBatch>>>(),
)
.context(table::CreateMemTableSnafu)
.map_err(Box::new).context(table::CreateMemTableSnafu)
}
async fn to_datafusion_table(

View File

@ -49,9 +49,11 @@ async fn to_datafusion_table(
return to_mem_table(&t, &dfctx).await;
}
let table_url =
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
uri: t.get_uri_str().to_string(),
})?;
ListingTableUrl::parse(t.get_uri_str())
.map_err(Box::new)
.context(table::ListingTableUriSnafu {
uri: t.get_uri_str().to_string(),
})?;
let mut options = ListingOptions::new(Arc::new(opt.as_df_csv_format()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
@ -70,7 +72,9 @@ async fn to_datafusion_table(
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
ListingTable::try_new(table_config)
.map_err(Box::new)
.context(table::CreateListingTableSnafu)?,
))
}
@ -101,6 +105,7 @@ async fn to_mem_table(
let (schema, record_count) = fmt
.infer_schema(r, None)
.context(InferSchemaSnafu)
.map_err(Box::new)
.context(table::LoadCsvSnafu)?;
if record_count > 0 {
Ok(Some(schema))
@ -117,39 +122,42 @@ async fn to_mem_table(
Arc::new(
Schema::try_merge(schemas)
.context(MergeSchemaSnafu)
.context(table::LoadCsvSnafu)?,
.map_err(Box::new)
.context(table::MergeSchemaSnafu)?,
)
}
};
debug!("loading csv table data...");
let partitions: Vec<Vec<RecordBatch>> = partitions_from_table_source!(
t,
|r| -> Result<Vec<RecordBatch>, table::Error> {
let mut builder = arrow::csv::reader::ReaderBuilder::new(schema_ref.clone())
.with_header(has_header)
.with_delimiter(delimiter)
.with_batch_size(batch_size);
if let Some(p) = projection {
builder = builder.with_projection(p.clone());
}
let csv_reader = builder
.build(r)
.context(BuildReaderSnafu)
.context(table::LoadCsvSnafu)?;
let partitions: Vec<Vec<RecordBatch>> =
partitions_from_table_source!(
t,
|r| -> Result<Vec<RecordBatch>, table::Error> {
let mut builder = arrow::csv::reader::ReaderBuilder::new(schema_ref.clone())
.with_header(has_header)
.with_delimiter(delimiter)
.with_batch_size(batch_size);
if let Some(p) = projection {
builder = builder.with_projection(p.clone());
}
let csv_reader = builder.build(r)
.context(BuildReaderSnafu)
.map_err(Box::new)
.context(table::LoadCsvSnafu)?;
csv_reader
.collect::<Result<Vec<RecordBatch>, _>>()
.context(ReadBytesSnafu)
.context(table::LoadCsvSnafu)
},
dfctx
)
.context(table::IoSnafu)?;
csv_reader
.collect::<Result<Vec<RecordBatch>, _>>()
.context(ReadBytesSnafu)
.map_err(Box::new)
.context(table::LoadCsvSnafu)
},
dfctx
)
.context(table::IoSnafu)?;
let table = Arc::new(
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.map_err(Box::new)
.context(table::CreateMemTableSnafu)?,
);

View File

@ -52,19 +52,19 @@ mod imp {
let queries = CXQuery::naked(format!("SELECT * FROM {}", table_name));
let source = SourceConn::try_from(t.get_uri_str())
.context(SourceSnafu)
.context(table::LoadDatabaseSnafu)?;
.map_err(Box::new).context(table::LoadDatabaseSnafu)?;
let destination = connectorx::get_arrow::get_arrow(&source, None, &[queries])
.context(DestinationSnafu)
.context(table::LoadDatabaseSnafu)?;
.map_err(Box::new).context(table::LoadDatabaseSnafu)?;
datafusion::datasource::MemTable::try_new(
destination.arrow_schema(),
vec![destination
.arrow()
.context(ToArrowSnafu)
.context(table::LoadDatabaseSnafu)?],
.map_err(Box::new).context(table::LoadDatabaseSnafu)?],
)
.context(table::CreateMemTableSnafu)
.map_err(Box::new).context(table::CreateMemTableSnafu)
}
}
}

View File

@ -61,6 +61,7 @@ async fn update_table(
.update()
.await
.context(UpdateTableSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?;
Ok(Arc::new(t) as Arc<dyn TableProvider>)
}
@ -79,11 +80,13 @@ pub async fn to_loaded_table(
let uri_str = t.get_uri_str();
let delta_table = deltalake::DeltaTableBuilder::from_valid_uri(uri_str)
.context(OpenTableSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?
.with_allow_http(true)
.load()
.await
.context(LoadTableSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?;
let parsed_uri = t.parsed_uri()?;
let url_scheme = parsed_uri.scheme();
@ -123,9 +126,9 @@ fn cast_datafusion_table(
| io::BlobStoreType::S3
| io::BlobStoreType::GCS
| io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)),
_ => Err(Error::InvalidUri {
_ => Err(Box::new(Error::InvalidUri {
uri: delta_table.table_uri().to_string(),
})
}))
.context(table::LoadDeltaSnafu),
}
}
@ -134,6 +137,7 @@ fn read_partition<R: Read>(mut r: R, batch_size: usize) -> Result<Vec<RecordBatc
let mut buffer = Vec::new();
r.read_to_end(&mut buffer)
.context(ReadBytesSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?;
let record_batch_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
@ -141,16 +145,20 @@ fn read_partition<R: Read>(mut r: R, batch_size: usize) -> Result<Vec<RecordBatc
ArrowReaderOptions::new().with_skip_arrow_metadata(true),
)
.context(NewReaderBuilderSnafu)
.context(table::LoadDeltaSnafu)?
.map_err(|e| table::Error::LoadDelta {
source: Box::new(e),
})?
.with_batch_size(batch_size)
.build()
.context(BuildReaderSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?;
record_batch_reader
.into_iter()
.collect::<arrow::error::Result<Vec<RecordBatch>>>()
.context(CollectRecordBatchSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)
}
@ -163,15 +171,18 @@ pub async fn to_mem_table(
let paths = delta_table
.get_file_uris()
.context(LoadTableSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?
.collect::<Vec<String>>();
if paths.is_empty() {
return Err(Error::EmptyTable {}).context(table::LoadDeltaSnafu);
return Err(Box::new(Error::EmptyTable {}))
.context(table::LoadDeltaSnafu);
}
let delta_schema = delta_table
.get_schema()
.context(GetSchemaSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?;
let path_iter = paths.iter().map(|s| s.as_ref());
@ -196,9 +207,9 @@ pub async fn to_mem_table(
.context(table::IoSnafu)?
}
_ => {
return Err(Error::InvalidUri {
return Err(Box::new(Error::InvalidUri {
uri: delta_table.table_uri().to_string(),
})
}))
.context(table::LoadDeltaSnafu);
}
};
@ -209,10 +220,12 @@ pub async fn to_mem_table(
delta_schema
.try_into()
.context(ConvertSchemaSnafu)
.map_err(Box::new)
.context(table::LoadDeltaSnafu)?,
),
partitions,
)
.map_err(Box::new)
.context(table::CreateMemTableSnafu)?,
))
}

View File

@ -357,7 +357,7 @@ pub async fn to_mem_table(
let uri = t.get_uri_str();
let mut workbook: Sheets<_> = open_workbook_auto(uri)
.context(OpenWorkbookSnafu)
.context(table::LoadExcelSnafu)?;
.map_err(Box::new).context(table::LoadExcelSnafu)?;
let worksheet_range = match &opt.sheet_name {
Some(sheet) => Some(workbook.worksheet_range(sheet)),
@ -365,19 +365,18 @@ pub async fn to_mem_table(
};
if let Some(Ok(range)) = worksheet_range {
let shema = infer_schema(&range, opt, &t.schema).context(table::LoadExcelSnafu)?;
let shema = infer_schema(&range, opt, &t.schema).map_err(Box::new).context(table::LoadExcelSnafu)?;
let batch =
excel_range_to_record_batch(range, opt, shema).context(table::LoadExcelSnafu)?;
excel_range_to_record_batch(range, opt, shema).map_err(Box::new).context(table::LoadExcelSnafu)?;
let schema_ref = batch.schema();
let partitions = vec![vec![batch]];
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.context(table::CreateMemTableSnafu)
.map_err(Box::new).context(table::CreateMemTableSnafu)
} else {
Err(Error::Load {
Err(Box::new(Error::Load {
msg: "Failed to open excel file.".to_owned(),
})
.context(table::LoadExcelSnafu)
})).context(table::LoadExcelSnafu)
}
}

View File

@ -322,13 +322,14 @@ static RE_GOOGLE_SHEET: LazyLock<Regex> =
async fn gs_get_req_contex(t: &TableSource) -> Result<GetReqContext, table::Error> {
let uri_str = t.get_uri_str();
if RE_GOOGLE_SHEET.captures(uri_str).is_none() {
return Err(Error::InvalidUri {
return Err(Box::new(Error::InvalidUri {
uri: uri_str.to_string(),
})
.context(table::LoadGoogleSheetSnafu);
})).context(table::LoadGoogleSheetSnafu);
}
let uri = URIReference::try_from(uri_str).context(table::InvalidUriReferenceSnafu)?;
let uri = URIReference::try_from(uri_str)
.map_err(Box::new)
.context(table::InvalidUriReferenceSnafu)?;
let spreadsheet_id = uri.path().segments()[2].as_str();
let opt = t
@ -339,17 +340,17 @@ async fn gs_get_req_contex(t: &TableSource) -> Result<GetReqContext, table::Erro
let token = fetch_auth_token(opt)
.await
.context(table::LoadGoogleSheetSnafu)?;
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let token_str = token
.token()
.ok_or(Error::EmptyToken {})
.context(table::LoadGoogleSheetSnafu)?;
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let sheet_title = match &opt.sheet_title {
Some(t) => t.clone(),
None => resolve_sheet_title(token_str, spreadsheet_id, &uri)
.await
.context(table::LoadGoogleSheetSnafu)?,
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?,
};
Ok(GetReqContext {
@ -367,26 +368,26 @@ async fn to_mem_table(
.token
.token()
.ok_or(Error::EmptyToken {})
.context(table::LoadGoogleSheetSnafu)?;
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let resp = gs_api_get(token_str, &ctx.url)
.await
.context(table::LoadGoogleSheetSnafu)?
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?
.error_for_status()
.context(HttpStatusSnafu)
.context(table::LoadGoogleSheetSnafu)?;
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let sheet = resp
.json::<SpreadsheetValues>()
.await
.context(ParseApiRespSnafu)
.context(table::LoadGoogleSheetSnafu)?;
.map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let batch = sheet_values_to_record_batch(&sheet.values).context(table::LoadGoogleSheetSnafu)?;
let batch = sheet_values_to_record_batch(&sheet.values).map_err(Box::new).context(table::LoadGoogleSheetSnafu)?;
let schema_ref = batch.schema();
let partitions = vec![vec![batch]];
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.context(table::CreateMemTableSnafu)
.map_err(Box::new).context(table::CreateMemTableSnafu)
}
pub async fn to_loaded_table(t: &TableSource) -> Result<LoadedTable, table::Error> {

View File

@ -50,6 +50,7 @@ fn json_value_from_reader<R: Read>(r: R) -> Result<Value, table::Error> {
let reader = BufReader::new(r);
serde_json::from_reader(reader)
.context(DeserializeSnafu)
.map_err(Box::new)
.context(table::LoadJsonSnafu)
}
@ -150,7 +151,8 @@ async fn to_partitions(
};
if array_encoded && t.schema.is_none() {
return Err(Error::ArrayEncodedSchemaRequired {}).context(table::LoadJsonSnafu);
return Err(Box::new(Error::ArrayEncodedSchemaRequired {}))
.context(table::LoadJsonSnafu);
}
let pointer = match &t.option {
@ -166,27 +168,31 @@ async fn to_partitions(
.iter()
.map(|json_partition| {
let json_rows = json_partition_to_vec(json_partition, pointer.as_deref())
.map_err(Box::new)
.context(table::LoadJsonSnafu)?;
if json_rows.is_empty() {
match &pointer {
Some(p) => {
return Err(Error::EmptyArrayPointer {
return Err(Box::new(Error::EmptyArrayPointer {
pointer: p.to_string(),
})
}))
.context(table::LoadJsonSnafu);
}
None => {
return Err(Error::EmptyArray {}).context(table::LoadJsonSnafu);
return Err(Box::new(Error::EmptyArray {}))
.context(table::LoadJsonSnafu);
}
}
}
let (batch_schema, partition) =
json_vec_to_partition(json_rows, &t.schema, batch_size, array_encoded)
.map_err(Box::new)
.context(table::LoadJsonSnafu)?;
merged_schema = Some(match &merged_schema {
Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema])
.map_err(Box::new)
.context(table::MergeSchemaSnafu)?,
_ => batch_schema,
});
@ -207,10 +213,12 @@ pub async fn to_mem_table(
Arc::new(
merged_schema
.ok_or(Error::SchemaNotFound {})
.map_err(Box::new)
.context(table::LoadJsonSnafu)?,
),
partitions,
)
.map_err(Box::new)
.context(table::CreateMemTableSnafu)
}

View File

@ -31,36 +31,36 @@ pub mod parquet;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to parse JSON: {source}"))]
LoadJson { source: json::Error },
LoadJson { source: Box<json::Error> },
#[snafu(display("Failed to parse NDJSON: {source}"))]
LoadNdJson { source: ndjson::Error },
LoadNdJson { source: Box<ndjson::Error> },
#[snafu(display("Failed to load parquet: {source}"))]
LoadParquet { source: parquet::Error },
LoadParquet { source: Box<parquet::Error> },
#[snafu(display("Failed to load csv data: {source}"))]
LoadCsv { source: csv::Error },
LoadCsv { source: Box<csv::Error> },
#[snafu(display("Failed to load delta table: {source}"))]
LoadDelta { source: delta::Error },
LoadDelta { source: Box<delta::Error> },
#[snafu(display("Failed to load Arrow IPC data: {source}"))]
LoadArrowIpc { source: arrow_ipc_stream::Error },
LoadArrowIpc { source: Box<arrow_ipc_stream::Error> },
#[snafu(display("Failed to load Arrow IPC file data: {source}"))]
LoadArrowIpcFile { source: arrow_ipc_file::Error },
LoadArrowIpcFile { source: Box<arrow_ipc_file::Error> },
#[snafu(display("Failed to load Google Sheet data: {source}"))]
LoadGoogleSheet { source: google_spreadsheets::Error },
LoadGoogleSheet { source: Box<google_spreadsheets::Error> },
#[snafu(display("Failed to load Excel data: {source}"))]
LoadExcel { source: excel::Error },
LoadExcel { source: Box<excel::Error> },
#[snafu(display("Failed to load database data: {source}"))]
LoadDatabase { source: database::Error },
LoadDatabase { source: Box<database::Error> },
#[snafu(display("Failed to cast IO source to memory bytes for source: {table_source}"))]
MemoryCast { table_source: TableIoSource },
#[snafu(display("Failed to resolve extension: {msg}"))]
Extension { msg: String },
#[snafu(display("Failed to create datafusion memory table: {source}"))]
CreateMemTable {
source: datafusion::error::DataFusionError,
source: Box<datafusion::error::DataFusionError>,
},
#[snafu(display("Failed to create datafusion listing table: {source}"))]
CreateListingTable {
source: datafusion::error::DataFusionError,
source: Box<datafusion::error::DataFusionError>,
},
#[snafu(display("Failed to read table data: {source}"))]
Io { source: io::Error },
@ -69,19 +69,19 @@ pub enum Error {
#[snafu(display("Invalid table URI: {msg}"))]
InvalidUri { msg: String },
#[snafu(display("Invalid URI: {source}"))]
InvalidUriReference { source: uriparse::URIReferenceError },
InvalidUriReference { source: Box<uriparse::URIReferenceError> },
#[snafu(display("Failed to infer schema for listing table"))]
InferListingTableSchema {
source: datafusion::error::DataFusionError,
source: Box<datafusion::error::DataFusionError>,
},
#[snafu(display("Failed to parse URI for listing table: {uri}"))]
ListingTableUri {
uri: String,
source: datafusion::error::DataFusionError,
source: Box<datafusion::error::DataFusionError>,
},
#[snafu(display("Failed to merge schema: {source}"))]
MergeSchema {
source: datafusion::arrow::error::ArrowError,
source: Box<datafusion::arrow::error::ArrowError>,
},
#[snafu(display("Table source missing required option"))]
MissingOption {},
@ -89,7 +89,7 @@ pub enum Error {
Generic { msg: String },
#[snafu(display("Failed to infer table schema: {source}"))]
InferSchema {
source: datafusion::error::DataFusionError,
source: Box<datafusion::error::DataFusionError>,
},
}
@ -719,21 +719,26 @@ pub async fn datafusion_get_or_infer_schema(
.to_str()
.expect("Failed to create file url"),
)
.map_err(Box::new)
.context(InferSchemaSnafu)?;
let inferred_schema = listing_options
.infer_schema(&dfctx.state(), &file_url)
.await
.map_err(Box::new)
.context(InferSchemaSnafu)?;
schemas.push(
Arc::into_inner(inferred_schema)
.expect("Failed to unwrap schemaref into schema on merge"),
);
}
Arc::new(arrow::datatypes::Schema::try_merge(schemas).context(MergeSchemaSnafu)?)
Arc::new(arrow::datatypes::Schema::try_merge(schemas)
.map_err(Box::new)
.context(MergeSchemaSnafu)?)
}
(None, None) => listing_options
.infer_schema(&dfctx.state(), table_url)
.await
.map_err(Box::new)
.context(InferSchemaSnafu)?,
})
}

View File

@ -32,6 +32,7 @@ fn json_schema_from_reader<R: Read>(r: R) -> Result<Schema, table::Error> {
let mut reader = BufReader::new(r);
let (schema, _) = infer_json_schema(&mut reader, None)
.context(InferSchemaSnafu)
.map_err(Box::new)
.context(table::LoadNdJsonSnafu)?;
Ok(schema)
}
@ -45,11 +46,13 @@ fn decode_json_from_reader<R: Read>(
.with_batch_size(batch_size)
.build(BufReader::new(r))
.context(BuildReaderSnafu)
.map_err(Box::new)
.context(table::LoadNdJsonSnafu)?;
let batches = batch_reader
.collect::<Result<Vec<RecordBatch>, _>>()
.context(CollectBatchesSnafu)
.map_err(Box::new)
.context(table::LoadNdJsonSnafu)?;
Ok(batches)
@ -69,11 +72,13 @@ pub async fn to_mem_table(
partitions_from_table_source!(t, json_schema_from_reader, dfctx)
.context(table::IoSnafu)?;
if inferred_schema.is_empty() {
return Err(Error::EmptySchema {}).context(table::LoadNdJsonSnafu);
return Err(Box::new(Error::EmptySchema {}))
.context(table::LoadNdJsonSnafu);
}
Arc::new(
Schema::try_merge(inferred_schema)
.context(InferSchemaSnafu)
.map_err(Box::new)
.context(table::LoadNdJsonSnafu)?,
)
}
@ -87,7 +92,7 @@ pub async fn to_mem_table(
.context(table::IoSnafu)?;
datafusion::datasource::MemTable::try_new(schema_ref, partitions)
.context(table::CreateMemTableSnafu)
.map_err(Box::new).context(table::CreateMemTableSnafu)
}
async fn to_datafusion_table(

View File

@ -67,7 +67,7 @@ async fn to_datafusion_table(
} else {
let table_url = ListingTableUrl::parse(t.get_uri_str())
.context(ParseUriSnafu)
.context(table::LoadParquetSnafu)?;
.map_err(Box::new).context(table::LoadParquetSnafu)?;
let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
@ -86,7 +86,7 @@ async fn to_datafusion_table(
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
ListingTable::try_new(table_config).map_err(Box::new).context(table::CreateListingTableSnafu)?,
))
}
}
@ -107,25 +107,24 @@ pub async fn to_mem_table(
let mut buffer = Vec::new();
r.read_to_end(&mut buffer)
.context(LoadBytesSnafu)
.context(table::LoadParquetSnafu)?;
.map_err(Box::new).context(table::LoadParquetSnafu)?;
let record_batch_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
bytes::Bytes::from(buffer),
ArrowReaderOptions::new(),
)
.context(BuildReaderSnafu)
.context(table::LoadParquetSnafu)?
.map_err(Box::new).context(table::LoadParquetSnafu)?
.with_batch_size(batch_size)
.build()
.context(BuildReaderSnafu)
.context(table::LoadParquetSnafu)?;
.map_err(Box::new).context(table::LoadParquetSnafu)?;
let batch_schema = &*record_batch_reader.schema();
schema = Some(match &schema {
Some(s) if s != batch_schema => {
Schema::try_merge(vec![s.clone(), batch_schema.clone()])
.context(MergeSchemaSnafu)
.context(table::LoadParquetSnafu)?
.map_err(Box::new).context(table::MergeSchemaSnafu)?
}
_ => batch_schema.clone(),
});
@ -134,14 +133,15 @@ pub async fn to_mem_table(
.into_iter()
.collect::<arrow::error::Result<Vec<RecordBatch>>>()
.context(CollectBatchesSnafu)
.context(table::LoadParquetSnafu)
.map_err(Box::new).context(table::LoadParquetSnafu)
},
dfctx
)
.context(table::IoSnafu)?;
if partitions.is_empty() {
return Err(Error::EmptyPartition {}).context(table::LoadParquetSnafu);
return Err(Box::new(Error::EmptyPartition {}))
.context(table::LoadParquetSnafu);
}
let table = Arc::new(
@ -149,11 +149,12 @@ pub async fn to_mem_table(
Arc::new(
schema
.ok_or(Error::SchemaNotFound {})
.map_err(Box::new)
.context(table::LoadParquetSnafu)?,
),
partitions,
)
.context(table::CreateMemTableSnafu)?,
.map_err(Box::new).context(table::CreateMemTableSnafu)?,
);
Ok(table)

View File

@ -126,58 +126,58 @@ impl<H: RoapiContext> RoapiFlightSqlService<H> {
}
async fn get_ctx<T>(&self, req: &Request<T>) -> Result<SessionContext, Status> {
self.check_token(req)?;
self.check_token(req).map_err(|e| *e)?;
Ok(self.ctx.get_dfctx().await)
}
fn pop_result(&self, handle: &str) -> Result<Vec<RecordBatch>, Status> {
fn pop_result(&self, handle: &str) -> Result<Vec<RecordBatch>, Box<Status>> {
if let Some((_, result)) = self.results.remove(handle) {
Ok(result)
} else {
Err(Status::internal(format!(
Err(Box::new(Status::internal(format!(
"Request handle not found: {handle}"
)))?
))))?
}
}
fn get_plan(&self, handle: &str) -> Result<LogicalPlan, Status> {
fn get_plan(&self, handle: &str) -> Result<LogicalPlan, Box<Status>> {
if let Some(plan) = self.statements.get(handle) {
Ok(plan.clone())
} else {
Err(Status::internal(format!("Plan handle not found: {handle}")))?
Err(Box::new(Status::internal(format!("Plan handle not found: {handle}"))))?
}
}
fn remove_plan(&self, handle: &str) -> Result<(), Status> {
fn remove_plan(&self, handle: &str) -> Result<(), Box<Status>> {
self.statements.remove(&handle.to_string());
Ok(())
}
fn remove_result(&self, handle: &str) -> Result<(), Status> {
fn remove_result(&self, handle: &str) -> Result<(), Box<Status>> {
self.results.remove(handle);
Ok(())
}
fn check_token<T>(&self, req: &Request<T>) -> Result<(), Status> {
fn check_token<T>(&self, req: &Request<T>) -> Result<(), Box<Status>> {
if let Some(token) = &self.auth_token {
let metadata = req.metadata();
let auth_header = metadata
.get(AUTH_HEADER)
.ok_or_else(|| Status::unauthenticated("token not found"))?;
.ok_or_else(|| Box::new(Status::unauthenticated("token not found")))?;
let auth_header = auth_header
.to_str()
.map_err(|e| Status::internal(format!("Error parsing header: {e}")))?;
.map_err(|e| Box::new(Status::internal(format!("Error parsing header: {e}"))))?;
if !auth_header.starts_with(BEARER_PREFIX) {
Err(Status::internal("invalid auth type"))?;
Err(Box::new(Status::internal("invalid auth type")))?;
}
if auth_header.len() <= BEARER_PREFIX.len() {
return Err(Status::unauthenticated("invalid token"));
return Err(Box::new(Status::unauthenticated("invalid token")));
}
let user_token = &auth_header[BEARER_PREFIX.len()..];
if !constant_time_eq(token.as_bytes(), user_token.as_bytes()) {
return Err(Status::unauthenticated("invalid token"));
return Err(Box::new(Status::unauthenticated("invalid token")));
}
}
Ok(())
@ -279,7 +279,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
request: Request<Ticket>,
message: Any,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
self.check_token(&request)?;
self.check_token(&request).map_err(|e| *e)?;
if !message.is::<FetchResults>() {
Err(Status::unimplemented(format!(
@ -296,7 +296,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
let handle = fr.handle;
info!("getting results for {handle}");
let result = self.pop_result(&handle)?;
let result = self.pop_result(&handle).map_err(|e| *e)?;
// if we get an empty result, create an empty schema
let (schema, batches) = match result.first() {
None => (Arc::new(Schema::empty()), vec![]),
@ -319,7 +319,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
query: CommandStatementQuery,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
self.check_token(&request)?;
self.check_token(&request).map_err(|e| *e)?;
debug!("got flight_info_statement user query: {:#?}", &query);
let user_query = query.query.as_str();
@ -386,14 +386,14 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
cmd: CommandPreparedStatementQuery,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
self.check_token(&request)?;
self.check_token(&request).map_err(|e| *e)?;
info!("get_flight_info_prepared_statement");
let handle = std::str::from_utf8(&cmd.prepared_statement_handle)
.map_err(|e| internal_error!("Unable to parse uuid", e))?;
let ctx = self.get_ctx(&request).await?;
let plan = self.get_plan(handle)?;
let plan = self.get_plan(handle).map_err(|e| *e)?;
let state = ctx.state();
let df = DataFrame::new(state, plan);
@ -821,7 +821,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
query: ActionCreatePreparedStatementRequest,
request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
self.check_token(&request)?;
self.check_token(&request).map_err(|e| *e)?;
let user_query = query.query.as_str();
info!("do_action_create_prepared_statement: {user_query}");