diff --git a/columnq/src/table/arrow_ipc_file.rs b/columnq/src/table/arrow_ipc_file.rs index 3181a07..96e31eb 100644 --- a/columnq/src/table/arrow_ipc_file.rs +++ b/columnq/src/table/arrow_ipc_file.rs @@ -32,6 +32,7 @@ pub async fn to_mem_table( |mut r| { let arrow_file_reader = arrow::ipc::reader::FileReader::try_new(&mut r, None) .context(NewReaderSnafu) + .map_err(Box::new) .context(table::LoadArrowIpcFileSnafu)?; let schema = (*arrow_file_reader.schema()).clone(); @@ -40,6 +41,7 @@ pub async fn to_mem_table( .collect::, _>>() .map(|batches| (Some(schema), batches)) .context(CollectRecordBatchSnafu) + .map_err(Box::new) .context(table::LoadArrowIpcFileSnafu) }, dfctx @@ -57,6 +59,7 @@ pub async fn to_mem_table( .flat_map(|v| if !(v.1).is_empty() { v.0.take() } else { None }) .collect::>(), ) + .map_err(Box::new) .context(table::MergeSchemaSnafu)?, ) } @@ -69,6 +72,7 @@ pub async fn to_mem_table( .map(|v| v.1) .collect::>>(), ) + .map_err(Box::new) .context(table::CreateMemTableSnafu) } diff --git a/columnq/src/table/arrow_ipc_stream.rs b/columnq/src/table/arrow_ipc_stream.rs index 11f7602..b1a8732 100644 --- a/columnq/src/table/arrow_ipc_stream.rs +++ b/columnq/src/table/arrow_ipc_stream.rs @@ -32,7 +32,7 @@ pub async fn to_mem_table( |mut r| { let arrow_stream_reader = arrow::ipc::reader::StreamReader::try_new(&mut r, None) .context(NewReaderSnafu) - .context(table::LoadArrowIpcSnafu)?; + .map_err(Box::new).context(table::LoadArrowIpcSnafu)?; let schema = (*arrow_stream_reader.schema()).clone(); arrow_stream_reader @@ -40,7 +40,7 @@ pub async fn to_mem_table( .collect::, _>>() .map(|batches| (Some(schema), batches)) .context(CollectRecordBatchSnafu) - .context(table::LoadArrowIpcSnafu) + .map_err(Box::new).context(table::LoadArrowIpcSnafu) }, dfctx ) @@ -57,7 +57,7 @@ pub async fn to_mem_table( .flat_map(|v| if !(v.1).is_empty() { v.0.take() } else { None }) .collect::>(), ) - .context(table::MergeSchemaSnafu)?, + .map_err(Box::new).context(table::MergeSchemaSnafu)?, ) } }; @@ -69,7 +69,7 @@ pub async fn to_mem_table( .map(|v| v.1) .collect::>>(), ) - .context(table::CreateMemTableSnafu) + .map_err(Box::new).context(table::CreateMemTableSnafu) } async fn to_datafusion_table( diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 4132a21..c1c3f80 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -49,9 +49,11 @@ async fn to_datafusion_table( return to_mem_table(&t, &dfctx).await; } let table_url = - ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu { - uri: t.get_uri_str().to_string(), - })?; + ListingTableUrl::parse(t.get_uri_str()) + .map_err(Box::new) + .context(table::ListingTableUriSnafu { + uri: t.get_uri_str().to_string(), + })?; let mut options = ListingOptions::new(Arc::new(opt.as_df_csv_format())); if let Some(partition_cols) = t.datafusion_partition_cols() { options = options.with_table_partition_cols(partition_cols) @@ -70,7 +72,9 @@ async fn to_datafusion_table( .with_listing_options(options) .with_schema(schemaref); Ok(Arc::new( - ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?, + ListingTable::try_new(table_config) + .map_err(Box::new) + .context(table::CreateListingTableSnafu)?, )) } @@ -101,6 +105,7 @@ async fn to_mem_table( let (schema, record_count) = fmt .infer_schema(r, None) .context(InferSchemaSnafu) + .map_err(Box::new) .context(table::LoadCsvSnafu)?; if record_count > 0 { Ok(Some(schema)) @@ -117,39 +122,42 @@ async fn to_mem_table( Arc::new( Schema::try_merge(schemas) - .context(MergeSchemaSnafu) - .context(table::LoadCsvSnafu)?, + .map_err(Box::new) + .context(table::MergeSchemaSnafu)?, ) } }; debug!("loading csv table data..."); - let partitions: Vec> = partitions_from_table_source!( - t, - |r| -> Result, table::Error> { - let mut builder = arrow::csv::reader::ReaderBuilder::new(schema_ref.clone()) - .with_header(has_header) - .with_delimiter(delimiter) - .with_batch_size(batch_size); - if let Some(p) = projection { - builder = builder.with_projection(p.clone()); - } - let csv_reader = builder - .build(r) - .context(BuildReaderSnafu) - .context(table::LoadCsvSnafu)?; + let partitions: Vec> = + partitions_from_table_source!( + t, + |r| -> Result, table::Error> { + let mut builder = arrow::csv::reader::ReaderBuilder::new(schema_ref.clone()) + .with_header(has_header) + .with_delimiter(delimiter) + .with_batch_size(batch_size); + if let Some(p) = projection { + builder = builder.with_projection(p.clone()); + } + let csv_reader = builder.build(r) + .context(BuildReaderSnafu) + .map_err(Box::new) + .context(table::LoadCsvSnafu)?; - csv_reader - .collect::, _>>() - .context(ReadBytesSnafu) - .context(table::LoadCsvSnafu) - }, - dfctx - ) - .context(table::IoSnafu)?; + csv_reader + .collect::, _>>() + .context(ReadBytesSnafu) + .map_err(Box::new) + .context(table::LoadCsvSnafu) + }, + dfctx + ) + .context(table::IoSnafu)?; let table = Arc::new( datafusion::datasource::MemTable::try_new(schema_ref, partitions) + .map_err(Box::new) .context(table::CreateMemTableSnafu)?, ); diff --git a/columnq/src/table/database.rs b/columnq/src/table/database.rs index 3fc6a40..ce46ea1 100644 --- a/columnq/src/table/database.rs +++ b/columnq/src/table/database.rs @@ -52,19 +52,19 @@ mod imp { let queries = CXQuery::naked(format!("SELECT * FROM {}", table_name)); let source = SourceConn::try_from(t.get_uri_str()) .context(SourceSnafu) - .context(table::LoadDatabaseSnafu)?; + .map_err(Box::new).context(table::LoadDatabaseSnafu)?; let destination = connectorx::get_arrow::get_arrow(&source, None, &[queries]) .context(DestinationSnafu) - .context(table::LoadDatabaseSnafu)?; + .map_err(Box::new).context(table::LoadDatabaseSnafu)?; datafusion::datasource::MemTable::try_new( destination.arrow_schema(), vec![destination .arrow() .context(ToArrowSnafu) - .context(table::LoadDatabaseSnafu)?], + .map_err(Box::new).context(table::LoadDatabaseSnafu)?], ) - .context(table::CreateMemTableSnafu) + .map_err(Box::new).context(table::CreateMemTableSnafu) } } } diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index 860ab97..bdbea72 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -61,6 +61,7 @@ async fn update_table( .update() .await .context(UpdateTableSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)?; Ok(Arc::new(t) as Arc) } @@ -79,11 +80,13 @@ pub async fn to_loaded_table( let uri_str = t.get_uri_str(); let delta_table = deltalake::DeltaTableBuilder::from_valid_uri(uri_str) .context(OpenTableSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)? .with_allow_http(true) .load() .await .context(LoadTableSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)?; let parsed_uri = t.parsed_uri()?; let url_scheme = parsed_uri.scheme(); @@ -123,9 +126,9 @@ fn cast_datafusion_table( | io::BlobStoreType::S3 | io::BlobStoreType::GCS | io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)), - _ => Err(Error::InvalidUri { + _ => Err(Box::new(Error::InvalidUri { uri: delta_table.table_uri().to_string(), - }) + })) .context(table::LoadDeltaSnafu), } } @@ -134,6 +137,7 @@ fn read_partition(mut r: R, batch_size: usize) -> Result(mut r: R, batch_size: usize) -> Result>>() .context(CollectRecordBatchSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu) } @@ -163,15 +171,18 @@ pub async fn to_mem_table( let paths = delta_table .get_file_uris() .context(LoadTableSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)? .collect::>(); if paths.is_empty() { - return Err(Error::EmptyTable {}).context(table::LoadDeltaSnafu); + return Err(Box::new(Error::EmptyTable {})) + .context(table::LoadDeltaSnafu); } let delta_schema = delta_table .get_schema() .context(GetSchemaSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)?; let path_iter = paths.iter().map(|s| s.as_ref()); @@ -196,9 +207,9 @@ pub async fn to_mem_table( .context(table::IoSnafu)? } _ => { - return Err(Error::InvalidUri { + return Err(Box::new(Error::InvalidUri { uri: delta_table.table_uri().to_string(), - }) + })) .context(table::LoadDeltaSnafu); } }; @@ -209,10 +220,12 @@ pub async fn to_mem_table( delta_schema .try_into() .context(ConvertSchemaSnafu) + .map_err(Box::new) .context(table::LoadDeltaSnafu)?, ), partitions, ) + .map_err(Box::new) .context(table::CreateMemTableSnafu)?, )) } diff --git a/columnq/src/table/excel.rs b/columnq/src/table/excel.rs index 786d334..8018851 100644 --- a/columnq/src/table/excel.rs +++ b/columnq/src/table/excel.rs @@ -357,7 +357,7 @@ pub async fn to_mem_table( let uri = t.get_uri_str(); let mut workbook: Sheets<_> = open_workbook_auto(uri) .context(OpenWorkbookSnafu) - .context(table::LoadExcelSnafu)?; + .map_err(Box::new).context(table::LoadExcelSnafu)?; let worksheet_range = match &opt.sheet_name { Some(sheet) => Some(workbook.worksheet_range(sheet)), @@ -365,19 +365,18 @@ pub async fn to_mem_table( }; if let Some(Ok(range)) = worksheet_range { - let shema = infer_schema(&range, opt, &t.schema).context(table::LoadExcelSnafu)?; + let shema = infer_schema(&range, opt, &t.schema).map_err(Box::new).context(table::LoadExcelSnafu)?; let batch = - excel_range_to_record_batch(range, opt, shema).context(table::LoadExcelSnafu)?; + excel_range_to_record_batch(range, opt, shema).map_err(Box::new).context(table::LoadExcelSnafu)?; let schema_ref = batch.schema(); let partitions = vec![vec![batch]]; datafusion::datasource::MemTable::try_new(schema_ref, partitions) - .context(table::CreateMemTableSnafu) + .map_err(Box::new).context(table::CreateMemTableSnafu) } else { - Err(Error::Load { + Err(Box::new(Error::Load { msg: "Failed to open excel file.".to_owned(), - }) - .context(table::LoadExcelSnafu) + })).context(table::LoadExcelSnafu) } } diff --git a/columnq/src/table/google_spreadsheets.rs b/columnq/src/table/google_spreadsheets.rs index 4944d19..2fa9dc8 100644 --- a/columnq/src/table/google_spreadsheets.rs +++ b/columnq/src/table/google_spreadsheets.rs @@ -322,13 +322,14 @@ static RE_GOOGLE_SHEET: LazyLock = async fn gs_get_req_contex(t: &TableSource) -> Result { let uri_str = t.get_uri_str(); if RE_GOOGLE_SHEET.captures(uri_str).is_none() { - return Err(Error::InvalidUri { + return Err(Box::new(Error::InvalidUri { uri: uri_str.to_string(), - }) - .context(table::LoadGoogleSheetSnafu); + })).context(table::LoadGoogleSheetSnafu); } - let uri = URIReference::try_from(uri_str).context(table::InvalidUriReferenceSnafu)?; + let uri = URIReference::try_from(uri_str) + .map_err(Box::new) + .context(table::InvalidUriReferenceSnafu)?; let spreadsheet_id = uri.path().segments()[2].as_str(); let opt = t @@ -339,17 +340,17 @@ async fn gs_get_req_contex(t: &TableSource) -> Result t.clone(), None => resolve_sheet_title(token_str, spreadsheet_id, &uri) .await - .context(table::LoadGoogleSheetSnafu)?, + .map_err(Box::new).context(table::LoadGoogleSheetSnafu)?, }; Ok(GetReqContext { @@ -367,26 +368,26 @@ async fn to_mem_table( .token .token() .ok_or(Error::EmptyToken {}) - .context(table::LoadGoogleSheetSnafu)?; + .map_err(Box::new).context(table::LoadGoogleSheetSnafu)?; let resp = gs_api_get(token_str, &ctx.url) .await - .context(table::LoadGoogleSheetSnafu)? + .map_err(Box::new).context(table::LoadGoogleSheetSnafu)? .error_for_status() .context(HttpStatusSnafu) - .context(table::LoadGoogleSheetSnafu)?; + .map_err(Box::new).context(table::LoadGoogleSheetSnafu)?; let sheet = resp .json::() .await .context(ParseApiRespSnafu) - .context(table::LoadGoogleSheetSnafu)?; + .map_err(Box::new).context(table::LoadGoogleSheetSnafu)?; - let batch = sheet_values_to_record_batch(&sheet.values).context(table::LoadGoogleSheetSnafu)?; + let batch = sheet_values_to_record_batch(&sheet.values).map_err(Box::new).context(table::LoadGoogleSheetSnafu)?; let schema_ref = batch.schema(); let partitions = vec![vec![batch]]; datafusion::datasource::MemTable::try_new(schema_ref, partitions) - .context(table::CreateMemTableSnafu) + .map_err(Box::new).context(table::CreateMemTableSnafu) } pub async fn to_loaded_table(t: &TableSource) -> Result { diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index d0a8825..f3348e4 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -50,6 +50,7 @@ fn json_value_from_reader(r: R) -> Result { let reader = BufReader::new(r); serde_json::from_reader(reader) .context(DeserializeSnafu) + .map_err(Box::new) .context(table::LoadJsonSnafu) } @@ -150,7 +151,8 @@ async fn to_partitions( }; if array_encoded && t.schema.is_none() { - return Err(Error::ArrayEncodedSchemaRequired {}).context(table::LoadJsonSnafu); + return Err(Box::new(Error::ArrayEncodedSchemaRequired {})) + .context(table::LoadJsonSnafu); } let pointer = match &t.option { @@ -166,27 +168,31 @@ async fn to_partitions( .iter() .map(|json_partition| { let json_rows = json_partition_to_vec(json_partition, pointer.as_deref()) + .map_err(Box::new) .context(table::LoadJsonSnafu)?; if json_rows.is_empty() { match &pointer { Some(p) => { - return Err(Error::EmptyArrayPointer { + return Err(Box::new(Error::EmptyArrayPointer { pointer: p.to_string(), - }) + })) .context(table::LoadJsonSnafu); } None => { - return Err(Error::EmptyArray {}).context(table::LoadJsonSnafu); + return Err(Box::new(Error::EmptyArray {})) + .context(table::LoadJsonSnafu); } } } let (batch_schema, partition) = json_vec_to_partition(json_rows, &t.schema, batch_size, array_encoded) + .map_err(Box::new) .context(table::LoadJsonSnafu)?; merged_schema = Some(match &merged_schema { Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema]) + .map_err(Box::new) .context(table::MergeSchemaSnafu)?, _ => batch_schema, }); @@ -207,10 +213,12 @@ pub async fn to_mem_table( Arc::new( merged_schema .ok_or(Error::SchemaNotFound {}) + .map_err(Box::new) .context(table::LoadJsonSnafu)?, ), partitions, ) + .map_err(Box::new) .context(table::CreateMemTableSnafu) } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index a234c34..a8e34a1 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -31,36 +31,36 @@ pub mod parquet; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to parse JSON: {source}"))] - LoadJson { source: json::Error }, + LoadJson { source: Box }, #[snafu(display("Failed to parse NDJSON: {source}"))] - LoadNdJson { source: ndjson::Error }, + LoadNdJson { source: Box }, #[snafu(display("Failed to load parquet: {source}"))] - LoadParquet { source: parquet::Error }, + LoadParquet { source: Box }, #[snafu(display("Failed to load csv data: {source}"))] - LoadCsv { source: csv::Error }, + LoadCsv { source: Box }, #[snafu(display("Failed to load delta table: {source}"))] - LoadDelta { source: delta::Error }, + LoadDelta { source: Box }, #[snafu(display("Failed to load Arrow IPC data: {source}"))] - LoadArrowIpc { source: arrow_ipc_stream::Error }, + LoadArrowIpc { source: Box }, #[snafu(display("Failed to load Arrow IPC file data: {source}"))] - LoadArrowIpcFile { source: arrow_ipc_file::Error }, + LoadArrowIpcFile { source: Box }, #[snafu(display("Failed to load Google Sheet data: {source}"))] - LoadGoogleSheet { source: google_spreadsheets::Error }, + LoadGoogleSheet { source: Box }, #[snafu(display("Failed to load Excel data: {source}"))] - LoadExcel { source: excel::Error }, + LoadExcel { source: Box }, #[snafu(display("Failed to load database data: {source}"))] - LoadDatabase { source: database::Error }, + LoadDatabase { source: Box }, #[snafu(display("Failed to cast IO source to memory bytes for source: {table_source}"))] MemoryCast { table_source: TableIoSource }, #[snafu(display("Failed to resolve extension: {msg}"))] Extension { msg: String }, #[snafu(display("Failed to create datafusion memory table: {source}"))] CreateMemTable { - source: datafusion::error::DataFusionError, + source: Box, }, #[snafu(display("Failed to create datafusion listing table: {source}"))] CreateListingTable { - source: datafusion::error::DataFusionError, + source: Box, }, #[snafu(display("Failed to read table data: {source}"))] Io { source: io::Error }, @@ -69,19 +69,19 @@ pub enum Error { #[snafu(display("Invalid table URI: {msg}"))] InvalidUri { msg: String }, #[snafu(display("Invalid URI: {source}"))] - InvalidUriReference { source: uriparse::URIReferenceError }, + InvalidUriReference { source: Box }, #[snafu(display("Failed to infer schema for listing table"))] InferListingTableSchema { - source: datafusion::error::DataFusionError, + source: Box, }, #[snafu(display("Failed to parse URI for listing table: {uri}"))] ListingTableUri { uri: String, - source: datafusion::error::DataFusionError, + source: Box, }, #[snafu(display("Failed to merge schema: {source}"))] MergeSchema { - source: datafusion::arrow::error::ArrowError, + source: Box, }, #[snafu(display("Table source missing required option"))] MissingOption {}, @@ -89,7 +89,7 @@ pub enum Error { Generic { msg: String }, #[snafu(display("Failed to infer table schema: {source}"))] InferSchema { - source: datafusion::error::DataFusionError, + source: Box, }, } @@ -719,21 +719,26 @@ pub async fn datafusion_get_or_infer_schema( .to_str() .expect("Failed to create file url"), ) + .map_err(Box::new) .context(InferSchemaSnafu)?; let inferred_schema = listing_options .infer_schema(&dfctx.state(), &file_url) .await + .map_err(Box::new) .context(InferSchemaSnafu)?; schemas.push( Arc::into_inner(inferred_schema) .expect("Failed to unwrap schemaref into schema on merge"), ); } - Arc::new(arrow::datatypes::Schema::try_merge(schemas).context(MergeSchemaSnafu)?) + Arc::new(arrow::datatypes::Schema::try_merge(schemas) + .map_err(Box::new) + .context(MergeSchemaSnafu)?) } (None, None) => listing_options .infer_schema(&dfctx.state(), table_url) .await + .map_err(Box::new) .context(InferSchemaSnafu)?, }) } diff --git a/columnq/src/table/ndjson.rs b/columnq/src/table/ndjson.rs index e483a69..b7ccd1d 100644 --- a/columnq/src/table/ndjson.rs +++ b/columnq/src/table/ndjson.rs @@ -32,6 +32,7 @@ fn json_schema_from_reader(r: R) -> Result { let mut reader = BufReader::new(r); let (schema, _) = infer_json_schema(&mut reader, None) .context(InferSchemaSnafu) + .map_err(Box::new) .context(table::LoadNdJsonSnafu)?; Ok(schema) } @@ -45,11 +46,13 @@ fn decode_json_from_reader( .with_batch_size(batch_size) .build(BufReader::new(r)) .context(BuildReaderSnafu) + .map_err(Box::new) .context(table::LoadNdJsonSnafu)?; let batches = batch_reader .collect::, _>>() .context(CollectBatchesSnafu) + .map_err(Box::new) .context(table::LoadNdJsonSnafu)?; Ok(batches) @@ -69,11 +72,13 @@ pub async fn to_mem_table( partitions_from_table_source!(t, json_schema_from_reader, dfctx) .context(table::IoSnafu)?; if inferred_schema.is_empty() { - return Err(Error::EmptySchema {}).context(table::LoadNdJsonSnafu); + return Err(Box::new(Error::EmptySchema {})) + .context(table::LoadNdJsonSnafu); } Arc::new( Schema::try_merge(inferred_schema) .context(InferSchemaSnafu) + .map_err(Box::new) .context(table::LoadNdJsonSnafu)?, ) } @@ -87,7 +92,7 @@ pub async fn to_mem_table( .context(table::IoSnafu)?; datafusion::datasource::MemTable::try_new(schema_ref, partitions) - .context(table::CreateMemTableSnafu) + .map_err(Box::new).context(table::CreateMemTableSnafu) } async fn to_datafusion_table( diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 12564cd..334b4e6 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -67,7 +67,7 @@ async fn to_datafusion_table( } else { let table_url = ListingTableUrl::parse(t.get_uri_str()) .context(ParseUriSnafu) - .context(table::LoadParquetSnafu)?; + .map_err(Box::new).context(table::LoadParquetSnafu)?; let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); if let Some(partition_cols) = t.datafusion_partition_cols() { options = options.with_table_partition_cols(partition_cols) @@ -86,7 +86,7 @@ async fn to_datafusion_table( .with_listing_options(options) .with_schema(schemaref); Ok(Arc::new( - ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?, + ListingTable::try_new(table_config).map_err(Box::new).context(table::CreateListingTableSnafu)?, )) } } @@ -107,25 +107,24 @@ pub async fn to_mem_table( let mut buffer = Vec::new(); r.read_to_end(&mut buffer) .context(LoadBytesSnafu) - .context(table::LoadParquetSnafu)?; + .map_err(Box::new).context(table::LoadParquetSnafu)?; let record_batch_reader = ParquetRecordBatchReaderBuilder::try_new_with_options( bytes::Bytes::from(buffer), ArrowReaderOptions::new(), ) .context(BuildReaderSnafu) - .context(table::LoadParquetSnafu)? + .map_err(Box::new).context(table::LoadParquetSnafu)? .with_batch_size(batch_size) .build() .context(BuildReaderSnafu) - .context(table::LoadParquetSnafu)?; + .map_err(Box::new).context(table::LoadParquetSnafu)?; let batch_schema = &*record_batch_reader.schema(); schema = Some(match &schema { Some(s) if s != batch_schema => { Schema::try_merge(vec![s.clone(), batch_schema.clone()]) - .context(MergeSchemaSnafu) - .context(table::LoadParquetSnafu)? + .map_err(Box::new).context(table::MergeSchemaSnafu)? } _ => batch_schema.clone(), }); @@ -134,14 +133,15 @@ pub async fn to_mem_table( .into_iter() .collect::>>() .context(CollectBatchesSnafu) - .context(table::LoadParquetSnafu) + .map_err(Box::new).context(table::LoadParquetSnafu) }, dfctx ) .context(table::IoSnafu)?; if partitions.is_empty() { - return Err(Error::EmptyPartition {}).context(table::LoadParquetSnafu); + return Err(Box::new(Error::EmptyPartition {})) + .context(table::LoadParquetSnafu); } let table = Arc::new( @@ -149,11 +149,12 @@ pub async fn to_mem_table( Arc::new( schema .ok_or(Error::SchemaNotFound {}) + .map_err(Box::new) .context(table::LoadParquetSnafu)?, ), partitions, ) - .context(table::CreateMemTableSnafu)?, + .map_err(Box::new).context(table::CreateMemTableSnafu)?, ); Ok(table) diff --git a/roapi/src/server/flight_sql.rs b/roapi/src/server/flight_sql.rs index 455f484..cf42f7c 100644 --- a/roapi/src/server/flight_sql.rs +++ b/roapi/src/server/flight_sql.rs @@ -126,58 +126,58 @@ impl RoapiFlightSqlService { } async fn get_ctx(&self, req: &Request) -> Result { - self.check_token(req)?; + self.check_token(req).map_err(|e| *e)?; Ok(self.ctx.get_dfctx().await) } - fn pop_result(&self, handle: &str) -> Result, Status> { + fn pop_result(&self, handle: &str) -> Result, Box> { if let Some((_, result)) = self.results.remove(handle) { Ok(result) } else { - Err(Status::internal(format!( + Err(Box::new(Status::internal(format!( "Request handle not found: {handle}" - )))? + ))))? } } - fn get_plan(&self, handle: &str) -> Result { + fn get_plan(&self, handle: &str) -> Result> { if let Some(plan) = self.statements.get(handle) { Ok(plan.clone()) } else { - Err(Status::internal(format!("Plan handle not found: {handle}")))? + Err(Box::new(Status::internal(format!("Plan handle not found: {handle}"))))? } } - fn remove_plan(&self, handle: &str) -> Result<(), Status> { + fn remove_plan(&self, handle: &str) -> Result<(), Box> { self.statements.remove(&handle.to_string()); Ok(()) } - fn remove_result(&self, handle: &str) -> Result<(), Status> { + fn remove_result(&self, handle: &str) -> Result<(), Box> { self.results.remove(handle); Ok(()) } - fn check_token(&self, req: &Request) -> Result<(), Status> { + fn check_token(&self, req: &Request) -> Result<(), Box> { if let Some(token) = &self.auth_token { let metadata = req.metadata(); let auth_header = metadata .get(AUTH_HEADER) - .ok_or_else(|| Status::unauthenticated("token not found"))?; + .ok_or_else(|| Box::new(Status::unauthenticated("token not found")))?; let auth_header = auth_header .to_str() - .map_err(|e| Status::internal(format!("Error parsing header: {e}")))?; + .map_err(|e| Box::new(Status::internal(format!("Error parsing header: {e}"))))?; if !auth_header.starts_with(BEARER_PREFIX) { - Err(Status::internal("invalid auth type"))?; + Err(Box::new(Status::internal("invalid auth type")))?; } if auth_header.len() <= BEARER_PREFIX.len() { - return Err(Status::unauthenticated("invalid token")); + return Err(Box::new(Status::unauthenticated("invalid token"))); } let user_token = &auth_header[BEARER_PREFIX.len()..]; if !constant_time_eq(token.as_bytes(), user_token.as_bytes()) { - return Err(Status::unauthenticated("invalid token")); + return Err(Box::new(Status::unauthenticated("invalid token"))); } } Ok(()) @@ -279,7 +279,7 @@ impl FlightSqlService for RoapiFlightSqlService { request: Request, message: Any, ) -> Result::DoGetStream>, Status> { - self.check_token(&request)?; + self.check_token(&request).map_err(|e| *e)?; if !message.is::() { Err(Status::unimplemented(format!( @@ -296,7 +296,7 @@ impl FlightSqlService for RoapiFlightSqlService { let handle = fr.handle; info!("getting results for {handle}"); - let result = self.pop_result(&handle)?; + let result = self.pop_result(&handle).map_err(|e| *e)?; // if we get an empty result, create an empty schema let (schema, batches) = match result.first() { None => (Arc::new(Schema::empty()), vec![]), @@ -319,7 +319,7 @@ impl FlightSqlService for RoapiFlightSqlService { query: CommandStatementQuery, request: Request, ) -> Result, Status> { - self.check_token(&request)?; + self.check_token(&request).map_err(|e| *e)?; debug!("got flight_info_statement user query: {:#?}", &query); let user_query = query.query.as_str(); @@ -386,14 +386,14 @@ impl FlightSqlService for RoapiFlightSqlService { cmd: CommandPreparedStatementQuery, request: Request, ) -> Result, Status> { - self.check_token(&request)?; + self.check_token(&request).map_err(|e| *e)?; info!("get_flight_info_prepared_statement"); let handle = std::str::from_utf8(&cmd.prepared_statement_handle) .map_err(|e| internal_error!("Unable to parse uuid", e))?; let ctx = self.get_ctx(&request).await?; - let plan = self.get_plan(handle)?; + let plan = self.get_plan(handle).map_err(|e| *e)?; let state = ctx.state(); let df = DataFrame::new(state, plan); @@ -821,7 +821,7 @@ impl FlightSqlService for RoapiFlightSqlService { query: ActionCreatePreparedStatementRequest, request: Request, ) -> Result { - self.check_token(&request)?; + self.check_token(&request).map_err(|e| *e)?; let user_query = query.query.as_str(); info!("do_action_create_prepared_statement: {user_query}");