added warnings for non-parquet calls

This commit is contained in:
Kenny Zhang 2025-04-03 15:23:14 -04:00
parent 849009d84c
commit 6fad33f316

View File

@ -80,7 +80,7 @@ class SQLServerPipelineStorage(PipelineStorage):
Iterator of tuples with table name and match groups
"""
log.info(
"searching SQL Server database %s for tables matching %s",
"Searching SQL Server database %s for tables matching %s",
self._database_name,
file_pattern.pattern,
)
@ -152,10 +152,10 @@ class SQLServerPipelineStorage(PipelineStorage):
query = f"SELECT * FROM [{table_name}]" # noqa: S608
df = pd.read_sql(query, self._connection) # noqa: PD901
return df.to_parquet()
except Exception:
log.exception("Error reading data %s", key)
else:
log.warning("Attempted to call get() on SQL Server storage with non-parquet key %s. Skipping...", key)
return None
async def set(self, key: str, value: Any, encoding: str | None = None) -> None:
@ -213,6 +213,8 @@ class SQLServerPipelineStorage(PipelineStorage):
except Exception:
self._connection.rollback()
log.exception("Error writing data %s", key)
else:
log.warning("Attempted to call set() on SQL Server storage with non-parquet key %s. Skipping...", key)
async def has(self, key: str) -> bool:
"""Check if a table/file exists in SQL Server.
@ -235,6 +237,7 @@ class SQLServerPipelineStorage(PipelineStorage):
return False
else:
# Only dataframe outputs are stored in SQL server, so return false
log.warning("Attempted to call has() on SQL Server storage with non-parquet key %s", key)
return False
async def delete(self, key: str) -> None:
@ -253,6 +256,8 @@ class SQLServerPipelineStorage(PipelineStorage):
except Exception:
self._connection.rollback()
log.exception("Error deleting %s", key)
else:
log.warning("Attempted to call delete() on SQL Server storage with non-parquet key %s. Skipping...", key)
async def clear(self) -> None:
"""Clear the pipeline storage."""
@ -293,16 +298,11 @@ class SQLServerPipelineStorage(PipelineStorage):
row = cursor.fetchone()
if row:
return get_timestamp_formatted_with_local_tz(row[0])
else:
# Get metadata entry creation date
cursor.execute("SELECT created_at FROM metadata_store WHERE metadata_key = ?", key)
row = cursor.fetchone()
if row:
return get_timestamp_formatted_with_local_tz(row[0])
except Exception:
log.exception("Error getting creation date for %s", key)
return ""
else:
log.warning("Attempted to call get_creation_date() on SQL Server storage with non-parquet key %s", key)
return ""
def _create_progress_status(