Roman/dry ingest pipeline step (#3203)

### Description
The main goal of this was to reduce the duplicate code that was being
written for each ingest pipeline step to support async and not async
functionality.

Additional bug fixes found and fixed:
* each logger for ingest wasn't being instantiated correctly. This was
fixed to instantiate in the beginning of a pipeline run as soon as the
verbosity level can be determined.
* The `requires_dependencies` wrapper wasn't wrapping async functions
correctly. This was fixed so that `asyncio.iscoroutinefunction()` gets
trigger correctly.
This commit is contained in:
Roman Isecke 2024-06-14 09:46:44 -04:00 committed by GitHub
parent 29e64eb281
commit a6c09ec621
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 129 additions and 155 deletions

View File

@ -1,4 +1,4 @@
## 0.14.6-dev6
## 0.14.6-dev7
### Enhancements

View File

@ -1 +1 @@
__version__ = "0.14.6-dev6" # pragma: no cover
__version__ = "0.14.6-dev7" # pragma: no cover

View File

@ -24,7 +24,9 @@ download_path = work_dir / "download"
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve()), tqdm=True),
context=ProcessorConfig(
work_dir=str(work_dir.resolve()), tqdm=True, reprocess=True, verbose=True
),
indexer_config=S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/"),
downloader_config=S3DownloaderConfig(download_dir=download_path),
source_connection_config=S3ConnectionConfig(anonymous=True),

View File

@ -84,7 +84,8 @@ def redact_jsons(s: str) -> str:
try:
formatted_j = json.dumps(json.loads(j))
except json.JSONDecodeError:
formatted_j = json.dumps(ast.literal_eval(j))
lit = ast.literal_eval(j)
formatted_j = json.dumps(lit)
hidden_j = json.dumps(hide_sensitive_fields(json.loads(formatted_j)))
s = s.replace(j, hidden_j)
return s
@ -112,7 +113,8 @@ def make_default_logger(level: int) -> Logger:
handler.name = "ingest_log_handler"
formatter = SensitiveFormatter("%(asctime)s %(processName)-10s %(levelname)-8s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
if handler.name not in [h.name for h in logger.handlers]:
logger.addHandler(handler)
logger.setLevel(level)
remove_root_handlers(logger)
return logger

View File

@ -6,13 +6,13 @@ from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from time import time
from typing import Any, Optional, TypeVar
from typing import Any, Callable, Optional, TypeVar
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdm_asyncio
from unstructured.ingest.v2.interfaces import BaseProcess, ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.logger import logger, make_default_logger
BaseProcessT = TypeVar("BaseProcessT", bound=BaseProcess)
iterable_input = list[dict[str, Any]]
@ -98,7 +98,7 @@ class PipelineStep(ABC):
def _set_log_level(self, log_level: int):
# Set the log level for each spawned process when using multiprocessing pool
logger.setLevel(log_level)
make_default_logger(log_level)
@timed
def __call__(self, iterable: Optional[iterable_input] = None) -> Any:
@ -113,15 +113,16 @@ class PipelineStep(ABC):
return self.process_async(iterable=iterable)
return self.process_multiprocess(iterable=iterable)
def _run(self, *args, **kwargs: Any) -> Optional[Any]:
def _run(self, fn: Callable, **kwargs: Any) -> Optional[Any]:
return asyncio.run(self.run_async(_fn=fn, **kwargs))
async def _run_async(self, fn: Callable, **kwargs: Any) -> Optional[Any]:
raise NotImplementedError
async def _run_async(self, *args, **kwargs: Any) -> Optional[Any]:
raise NotImplementedError
def run(self, *args, **kwargs: Any) -> Optional[Any]:
def run(self, _fn: Optional[Callable] = None, **kwargs: Any) -> Optional[Any]:
try:
return self._run(*args, **kwargs)
fn = _fn or self.process.run
return self._run(fn=fn, **kwargs)
except Exception as e:
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:
@ -130,9 +131,10 @@ class PipelineStep(ABC):
raise e
return None
async def run_async(self, *args, **kwargs: Any) -> Optional[Any]:
async def run_async(self, _fn: Optional[Callable] = None, **kwargs: Any) -> Optional[Any]:
try:
return await self._run_async(*args, **kwargs)
fn = _fn or self.process.run_async
return await self._run_async(fn=fn, **kwargs)
except Exception as e:
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:

View File

@ -5,7 +5,7 @@ from time import time
from typing import Any, Optional, Union
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.logger import logger, make_default_logger
from unstructured.ingest.v2.pipeline.steps.chunk import Chunker, ChunkStep
from unstructured.ingest.v2.pipeline.steps.download import DownloaderT, DownloadStep
from unstructured.ingest.v2.pipeline.steps.embed import Embedder, EmbedStep
@ -59,7 +59,7 @@ class Pipeline:
stager: UploadStager = None,
uploader: Uploader = None,
):
logger.setLevel(level=logging.DEBUG if self.context.verbose else logging.INFO)
make_default_logger(level=logging.DEBUG if self.context.verbose else logging.INFO)
self.indexer_step = IndexStep(process=indexer, context=self.context)
self.downloader_step = DownloadStep(process=downloader, context=self.context)
self.partitioner_step = PartitionStep(process=partitioner, context=self.context)

View File

@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
@ -53,32 +54,23 @@ class ChunkStep(PipelineStep):
logger.debug(f"Writing chunker output to: {output_filepath}")
json.dump(chunked_content, f, indent=2)
def _run(self, path: str, file_data_path: str) -> ChunkStepResponse:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str, **kwargs
) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
chunked_content_raw = self.process.run(elements_filepath=path)
self._save_output(
output_filepath=str(output_filepath),
chunked_content=elements_to_dicts(chunked_content_raw),
)
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
async def _run_async(self, path: str, file_data_path: str) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"elements_filepath": path}
if not asyncio.iscoroutinefunction(fn):
chunked_content_raw = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
chunked_content_raw = await fn(**fn_kwargs)
else:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
chunked_content_raw = await fn(**fn_kwargs)
self._save_output(
output_filepath=str(output_filepath),
chunked_content=elements_to_dicts(chunked_content_raw),

View File

@ -1,7 +1,8 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from typing import Optional, TypedDict, TypeVar
from typing import Callable, Optional, TypedDict, TypeVar
from unstructured.ingest.v2.interfaces import FileData, download_responses
from unstructured.ingest.v2.interfaces.downloader import Downloader
@ -55,7 +56,7 @@ class DownloadStep(PipelineStep):
if self.context.re_download:
return True
download_path = self.process.get_download_path(file_data=file_data)
if not download_path.exists():
if not download_path or not download_path.exists():
return True
if (
download_path.is_file()
@ -69,6 +70,24 @@ class DownloadStep(PipelineStep):
return True
return False
async def _run_async(self, fn: Callable, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]
fn_kwargs = {"file_data": file_data}
if not asyncio.iscoroutinefunction(fn):
download_results = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
download_results = await fn(**fn_kwargs)
else:
download_results = await fn(**fn_kwargs)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)
def create_step_results(
self, current_file_data_path: str, download_results: download_responses
) -> list[DownloadStepResponse]:
@ -87,35 +106,6 @@ class DownloadStep(PipelineStep):
)
return download_step_results
def _run(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]
download_results = self.process.run(file_data=file_data)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)
async def _run_async(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if download_path and not self.should_download(
file_data=file_data, file_data_path=file_data_path
):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]
if semaphore := self.context.semaphore:
async with semaphore:
download_results = await self.process.run_async(file_data=file_data)
else:
download_results = await self.process.run_async(file_data=file_data)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)
def persist_new_file_data(self, file_data: FileData) -> str:
record_hash = self.get_hash(extras=[file_data.identifier])
filename = f"{record_hash}.json"

View File

@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
@ -53,33 +54,21 @@ class EmbedStep(PipelineStep):
logger.debug(f"Writing embedded output to: {output_filepath}")
json.dump(embedded_content, f, indent=2)
def _run(self, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
embed_content_raw = self.process.run(elements_filepath=path)
self._save_output(
output_filepath=str(output_filepath),
embedded_content=elements_to_dicts(embed_content_raw),
)
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
async def _run_async(self, path: str, file_data_path: str) -> EmbedStepResponse:
async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"elements_filepath": path}
if not asyncio.iscoroutinefunction(fn):
embed_content_raw = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
embed_content_raw = await self.process.run_async(elements_filepath=path)
embed_content_raw = await fn(**fn_kwargs)
else:
embed_content_raw = await self.process.run_async(elements_filepath=path)
embed_content_raw = await fn(**fn_kwargs)
self._save_output(
output_filepath=str(output_filepath),

View File

@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
@ -48,35 +49,23 @@ class PartitionStep(PipelineStep):
logger.debug(f"Writing partitioned output to: {output_filepath}")
json.dump(partitioned_content, f, indent=2)
def _run(self, path: str, file_data_path: str) -> PartitionStepResponse:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=Path(file_data_path))
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
partitioned_content = self.process.run(filename=path, metadata=file_data.metadata)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
)
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
async def _run_async(self, path: str, file_data_path: str) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=Path(file_data_path))
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"filename": path, "metadata": file_data.metadata}
if not asyncio.iscoroutinefunction(fn):
partitioned_content = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
partitioned_content = await fn(**fn_kwargs)
else:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
partitioned_content = await fn(**fn_kwargs)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
)

View File

@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict
from unstructured.ingest.v2.interfaces.file_data import FileData
from unstructured.ingest.v2.interfaces.upload_stager import UploadStager
@ -35,33 +36,23 @@ class UploadStageStep(PipelineStep):
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Created {self.identifier} with configs: {config}")
def _run(self, path: str, file_data_path: str) -> UploadStageStepResponse:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> UploadStageStepResponse:
path = Path(path)
staged_output_path = self.process.run(
elements_filepath=path,
file_data=FileData.from_file(path=file_data_path),
output_dir=self.cache_dir,
output_filename=self.get_hash(extras=[path.name]),
)
return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))
async def _run_async(self, path: str, file_data_path: str) -> UploadStageStepResponse:
path = Path(path)
if semaphore := self.context.semaphore:
fn_kwargs = {
"elements_filepath": path,
"file_data": FileData.from_file(path=file_data_path),
"output_dir": self.cache_dir,
"output_filename": self.get_hash(extras=[path.name]),
}
if not asyncio.iscoroutinefunction(fn):
staged_output_path = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
staged_output_path = await self.process.run_async(
elements_filepath=path,
file_data=FileData.from_file(path=file_data_path),
output_dir=self.cache_dir,
output_filename=self.get_hash(extras=[path.name]),
)
staged_output_path = await fn(**fn_kwargs)
else:
staged_output_path = await self.process.run_async(
elements_filepath=path,
file_data=FileData.from_file(path=file_data_path),
output_dir=self.cache_dir,
output_filename=self.get_hash(extras=[path.name]),
)
staged_output_path = await fn(**fn_kwargs)
return UploadStageStepResponse(file_data_path=file_data_path, path=str(staged_output_path))
def get_hash(self, extras: Optional[list[str]]) -> str:

View File

@ -1,5 +1,6 @@
import asyncio
from pathlib import Path
from typing import TypedDict
from typing import Callable, TypedDict
from unstructured.ingest.v2.interfaces.file_data import FileData
from unstructured.ingest.v2.logger import logger
@ -42,13 +43,18 @@ class UncompressStep(PipelineStep):
)
return responses
async def _run_async(self, path: str, file_data_path: str) -> list[UncompressStepResponse]:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> list[UncompressStepResponse]:
file_data = FileData.from_file(path=file_data_path)
if semaphore := self.context.semaphore:
fn_kwargs = {"file_data": file_data}
if not asyncio.iscoroutinefunction(fn):
new_file_data = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
new_file_data = await self.process.run_async(file_data=file_data)
new_file_data = await fn(**fn_kwargs)
else:
new_file_data = await self.process.run_async(file_data=file_data)
new_file_data = await fn(**fn_kwargs)
responses = []
for new_file in new_file_data:
new_file_data_path = Path(file_data_path).parent / f"{new_file.identifier}.json"

View File

@ -1,7 +1,7 @@
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import TypedDict
from typing import Callable, Optional, TypedDict
from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.interfaces.uploader import UploadContent, Uploader
@ -42,7 +42,7 @@ class UploadStep(PipelineStep):
)
def process_whole(self, iterable: iterable_input):
self.run(iterable)
self.run(contents=iterable)
async def _process_async(self, iterable: iterable_input):
return await asyncio.gather(*[self.run_async(**i) for i in iterable])
@ -60,20 +60,20 @@ class UploadStep(PipelineStep):
else:
self.process_whole(iterable=iterable)
def _run(self, contents: list[UploadStepContent]):
def _run(self, fn: Callable, contents: list[UploadStepContent]):
upload_contents = [
UploadContent(path=Path(c["path"]), file_data=FileData.from_file(c["file_data_path"]))
for c in contents
]
self.process.run(contents=upload_contents)
fn(contents=upload_contents)
async def _run_async(self, path: str, file_data_path: str):
if semaphore := self.context.semaphore:
with semaphore:
await self.process.run_async(
path=Path(path), file_data=FileData.from_file(path=file_data_path)
)
async def _run_async(self, path: str, file_data_path: str, fn: Optional[Callable] = None):
fn = fn or self.process.run_async
fn_kwargs = {"path": Path(path), "file_data": FileData.from_file(path=file_data_path)}
if not asyncio.iscoroutinefunction(fn):
fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
await fn(**fn_kwargs)
else:
await self.process.run_async(
path=Path(path), file_data=FileData.from_file(path=file_data_path)
)
await fn(**fn_kwargs)

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import functools
import html
import importlib
@ -227,8 +228,7 @@ def requires_dependencies(
dependencies = [dependencies]
def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
@wraps(func)
def wrapper(*args: _P.args, **kwargs: _P.kwargs):
def run_check():
missing_deps: List[str] = []
for dep in dependencies:
if not dependency_exists(dep):
@ -242,8 +242,19 @@ def requires_dependencies(
else f"Please install them using `pip install {' '.join(missing_deps)}`."
),
)
@wraps(func)
def wrapper(*args: _P.args, **kwargs: _P.kwargs):
run_check()
return func(*args, **kwargs)
@wraps(func)
async def wrapper_async(*args: _P.args, **kwargs: _P.kwargs):
run_check()
return await func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return wrapper_async
return wrapper
return decorator