tested query for cosmosdb

This commit is contained in:
Kenny Zhang 2024-11-19 14:45:59 -05:00
parent 31c0a7a316
commit c5281bb79a
3 changed files with 3 additions and 4 deletions

View File

@ -21,12 +21,12 @@ class ParquetTableEmitter(TableEmitter):
_storage: PipelineStorage
_on_error: ErrorHandlerFn
extension = "parquet"
def __init__(
self,
storage: PipelineStorage,
on_error: ErrorHandlerFn,
extension = "parquet",
):
"""Create a new Parquet Table Emitter."""
self._storage = storage

View File

@ -165,8 +165,7 @@ def to_optional_float(data: pd.Series, column_name: str | None) -> float | None:
if value is None:
return None
if not isinstance(value, float):
msg = f"value is not a float: {value} ({type(value)})"
raise ValueError(msg)
return float(value)
else:
msg = f"Column {column_name} not found in data"
raise ValueError(msg)

View File

@ -52,7 +52,7 @@ async def _load_table_from_storage(name: str, storage: PipelineStorage) -> pd.Da
return pd.read_parquet(BytesIO(await storage.get(name, as_bytes=True)))
case "json":
return pd.read_json(
StringIO(await storage.get(name, as_bytes=True)),
StringIO(await storage.get(name)),
lines=False,
orient="records",
)