mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-20 15:18:03 +00:00
refactor(ingest): Use sqlite.Row row_factory for FileBackedCollections (#7739)
This commit is contained in:
parent
23e57fffa2
commit
ce1ac7fa12
@ -64,6 +64,7 @@ class ConnectionWrapper:
|
|||||||
filename = pathlib.Path(self._directory.name) / _DEFAULT_FILE_NAME
|
filename = pathlib.Path(self._directory.name) / _DEFAULT_FILE_NAME
|
||||||
|
|
||||||
self.conn = sqlite3.connect(filename, isolation_level=None)
|
self.conn = sqlite3.connect(filename, isolation_level=None)
|
||||||
|
self.conn.row_factory = sqlite3.Row
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
|
|
||||||
# These settings are optimized for performance.
|
# These settings are optimized for performance.
|
||||||
@ -314,7 +315,7 @@ class FileBackedDict(MutableMapping[str, _VT], Generic[_VT], Closeable):
|
|||||||
query: str,
|
query: str,
|
||||||
params: Tuple[Any, ...] = (),
|
params: Tuple[Any, ...] = (),
|
||||||
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
||||||
) -> List[Tuple[Any, ...]]:
|
) -> List[sqlite3.Row]:
|
||||||
return self._sql_query(query, params, refs).fetchall()
|
return self._sql_query(query, params, refs).fetchall()
|
||||||
|
|
||||||
def sql_query_iterator(
|
def sql_query_iterator(
|
||||||
@ -322,7 +323,7 @@ class FileBackedDict(MutableMapping[str, _VT], Generic[_VT], Closeable):
|
|||||||
query: str,
|
query: str,
|
||||||
params: Tuple[Any, ...] = (),
|
params: Tuple[Any, ...] = (),
|
||||||
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
||||||
) -> Iterator[Tuple[Any, ...]]:
|
) -> Iterator[sqlite3.Row]:
|
||||||
return self._sql_query(query, params, refs)
|
return self._sql_query(query, params, refs)
|
||||||
|
|
||||||
def _sql_query(
|
def _sql_query(
|
||||||
@ -422,7 +423,7 @@ class FileBackedList(Generic[_VT]):
|
|||||||
query: str,
|
query: str,
|
||||||
params: Tuple[Any, ...] = (),
|
params: Tuple[Any, ...] = (),
|
||||||
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
refs: Optional[List[Union["FileBackedList", "FileBackedDict"]]] = None,
|
||||||
) -> List[Tuple[Any, ...]]:
|
) -> List[sqlite3.Row]:
|
||||||
return self._dict.sql_query(query, params, refs=refs)
|
return self._dict.sql_query(query, params, refs=refs)
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
|
@ -257,21 +257,19 @@ def test_shared_connection() -> None:
|
|||||||
f"SELECT y, sum(x) FROM {cache2.tablename} GROUP BY y ORDER BY y"
|
f"SELECT y, sum(x) FROM {cache2.tablename} GROUP BY y ORDER BY y"
|
||||||
)
|
)
|
||||||
assert type(iterator) == sqlite3.Cursor
|
assert type(iterator) == sqlite3.Cursor
|
||||||
assert list(iterator) == [("a", 15), ("b", 11)]
|
assert [tuple(r) for r in iterator] == [("a", 15), ("b", 11)]
|
||||||
|
|
||||||
# Test joining between the two tables.
|
# Test joining between the two tables.
|
||||||
assert (
|
rows = cache2.sql_query(
|
||||||
cache2.sql_query(
|
f"""
|
||||||
f"""
|
SELECT cache2.y, sum(cache2.x * cache1.v) FROM {cache2.tablename} cache2
|
||||||
SELECT cache2.y, sum(cache2.x * cache1.v) FROM {cache2.tablename} cache2
|
LEFT JOIN {cache1.tablename} cache1 ON cache1.key = cache2.y
|
||||||
LEFT JOIN {cache1.tablename} cache1 ON cache1.key = cache2.y
|
GROUP BY cache2.y
|
||||||
GROUP BY cache2.y
|
ORDER BY cache2.y
|
||||||
ORDER BY cache2.y
|
""",
|
||||||
""",
|
refs=[cache1],
|
||||||
refs=[cache1],
|
|
||||||
)
|
|
||||||
== [("a", 45), ("b", 55)]
|
|
||||||
)
|
)
|
||||||
|
assert [tuple(row) for row in rows] == [("a", 45), ("b", 55)]
|
||||||
|
|
||||||
assert list(cache2.items_snapshot('y = "a"')) == [
|
assert list(cache2.items_snapshot('y = "a"')) == [
|
||||||
("ref-a-1", Pair(7, "a")),
|
("ref-a-1", Pair(7, "a")),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user