import unittest from io import BytesIO from unittest.mock import MagicMock, patch import pandas as pd from datahub.ingestion.source.excel.excel_file import ExcelFile, ExcelTable from datahub.ingestion.source.excel.report import ExcelSourceReport class TestExcelFile(unittest.TestCase): def setUp(self): self.report = ExcelSourceReport() self.filename = "test_file.xlsx" self.bytes_data = BytesIO(b"mock bytes data") self.bytes_data.seek(0) self.mock_wb = MagicMock() self.mock_wb.sheetnames = ["Sheet1", "Sheet2"] self.mock_wb.active.title = "Sheet1" self.mock_properties = MagicMock() self.mock_properties.title = "Test Title" self.mock_properties.creator = "Test Creator" self.mock_properties.subject = "Test Subject" self.mock_properties.description = "Test Description" self.mock_properties.keywords = "Test Keywords" self.mock_properties.category = "Test Category" self.mock_properties.lastModifiedBy = "Test Last Modified By" self.mock_properties.created = "2023-01-01" self.mock_properties.modified = "2023-01-02" self.mock_properties.contentStatus = "Test Status" self.mock_properties.revision = "Test Revision" self.mock_properties.version = "Test Version" self.mock_properties.language = "Test Language" self.mock_properties.identifier = "Test Identifier" self.mock_wb.properties = self.mock_properties @patch("openpyxl.load_workbook") def test_load_workbook_with_readable_seekable_bytesio(self, mock_load_workbook): mock_load_workbook.return_value = self.mock_wb mock_file = self.bytes_data excel_file = ExcelFile(self.filename, mock_file, self.report) result = excel_file.load_workbook() self.assertTrue(result) mock_load_workbook.assert_called_once() @patch("openpyxl.load_workbook") def test_load_workbook_with_invalid_data_type(self, mock_load_workbook): mock_load_workbook.side_effect = TypeError("Invalid data type") excel_file = ExcelFile(self.filename, 123, self.report) # type: ignore result = excel_file.load_workbook() self.assertFalse(result) self.assertEqual(len(self.report.filtered), 1) self.assertIn(self.filename, self.report.filtered) @patch("openpyxl.load_workbook") def test_load_workbook_with_exception(self, mock_load_workbook): mock_load_workbook.side_effect = Exception("Test exception") excel_file = ExcelFile(self.filename, self.bytes_data, self.report) result = excel_file.load_workbook() self.assertFalse(result) self.assertEqual(len(self.report.filtered), 1) self.assertIn(self.filename, self.report.filtered) def test_sheet_names_property(self): excel_file = ExcelFile(self.filename, self.bytes_data, self.report) excel_file.sheet_list = ["Sheet1", "Sheet2"] sheet_names = excel_file.sheet_names self.assertEqual(sheet_names, ["Sheet1", "Sheet2"]) def test_active_sheet_name_property(self): excel_file = ExcelFile(self.filename, self.bytes_data, self.report) excel_file.active_sheet = "Sheet1" active_sheet = excel_file.active_sheet_name self.assertEqual(active_sheet, "Sheet1") def test_workbook_properties_property(self): excel_file = ExcelFile(self.filename, self.bytes_data, self.report) excel_file.properties = {"title": "Test Title"} properties = excel_file.workbook_properties self.assertEqual(properties, {"title": "Test Title"}) @patch("openpyxl.load_workbook") def test_get_tables(self, mock_load_workbook): mock_load_workbook.return_value = self.mock_wb excel_file = ExcelFile(self.filename, self.bytes_data, self.report) excel_file.load_workbook() excel_file.get_table = MagicMock() # type: ignore mock_table1 = ExcelTable( df=pd.DataFrame(), header_row=1, footer_row=5, row_count=4, column_count=3, metadata={}, sheet_name="Sheet1", ) mock_table2 = ExcelTable( df=pd.DataFrame(), header_row=1, footer_row=5, row_count=4, column_count=3, metadata={}, sheet_name="Sheet2", ) excel_file.get_table.side_effect = [mock_table1, mock_table2] tables = list(excel_file.get_tables()) self.assertEqual(len(tables), 2) self.assertEqual(tables[0].sheet_name, "Sheet1") self.assertEqual(tables[1].sheet_name, "Sheet2") excel_file.get_table.assert_any_call("Sheet1") excel_file.get_table.assert_any_call("Sheet2") @patch("openpyxl.load_workbook") def test_get_table(self, mock_load_workbook): mock_sheet = MagicMock() mock_cell1 = MagicMock() mock_cell1.value = "Header1" mock_cell2 = MagicMock() mock_cell2.value = "Header2" mock_cell3 = MagicMock() mock_cell3.value = None row1 = [mock_cell1, mock_cell2, mock_cell3] mock_cell4 = MagicMock() mock_cell4.value = "Data1" mock_cell5 = MagicMock() mock_cell5.value = "Data2" mock_cell6 = MagicMock() mock_cell6.value = None row2 = [mock_cell4, mock_cell5, mock_cell6] mock_sheet.rows = [row1, row2] mock_sheet.title = "TestSheet" mock_wb = MagicMock() mock_wb.__getitem__.return_value = mock_sheet mock_wb.properties = self.mock_properties with patch( "datahub.ingestion.source.excel.excel_file.ExcelFile.find_header_row", return_value=0, ), patch( "datahub.ingestion.source.excel.excel_file.ExcelFile.find_footer_start", return_value=2, ), patch( "datahub.ingestion.source.excel.excel_file.ExcelFile.extract_metadata", return_value={}, ): excel_file = ExcelFile(self.filename, self.bytes_data, self.report) excel_file.wb = mock_wb excel_file.properties = excel_file.read_excel_properties(mock_wb) table = excel_file.get_table("TestSheet") assert table is not None self.assertIsInstance(table, ExcelTable) self.assertEqual(table.sheet_name, "TestSheet") self.assertEqual(table.header_row, 1) self.assertEqual(table.footer_row, 2) self.assertEqual(list(table.df.columns), ["Header1", "Header2"]) def test_find_header_row(self): test_cases = [ { "rows": [ ["Title", None], ["Name", "Age", "City"], ["John", 25, "New York"], ["Jane", 30, "Boston"], ], "expected": 1, }, { "rows": [ ["Name", "Age", "City"], ["John", 25, "New York"], ["Jane", 30, "Boston"], ["Tom", 45, "Chicago"], ], "expected": 0, }, { "rows": [ ["Report", "Sales Summary"], ["Date", "2023-01-01"], [None, None, None], ["Product", "Quantity", "Revenue"], ["Item1", 100, 1000], ["Item2", 200, 2000], ["Item3", 300, 3000], ], "expected": 3, }, { "rows": [ [None, None, None], [None, None, None], ], "expected": 0, }, ] excel_file = ExcelFile(self.filename, self.bytes_data, self.report) for i, case in enumerate(test_cases): result = excel_file.find_header_row(case["rows"]) # type: ignore self.assertEqual(result, case["expected"], f"Test case {i + 1} failed") def test_find_footer_start(self): test_cases = [ { "rows": [ ["Name", "Age", "City"], ["John", 25, "New York"], ["Jane", 30, "Boston"], ["Total", 55, None], ], "header_row_idx": 0, "expected": 3, }, { "rows": [ ["Product", "Quantity", "Revenue"], ["Item1", 100, 1000], ["Item2", 200, 2000], [None, None, None], ["Note: Data is preliminary"], ], "header_row_idx": 0, "expected": 3, }, { "rows": [ ["ID", "Value"], [1, 100], [2, 200], ], "header_row_idx": 0, "expected": 3, }, ] for i, case in enumerate(test_cases): result = ExcelFile.find_footer_start(case["rows"], case["header_row_idx"]) # type: ignore self.assertEqual(result, case["expected"], f"Test case {i + 1} failed") def test_extract_metadata(self): test_cases = [ { "rows": [ ["Title", "Sales Report"], ["Date", "2023-01-01"], ["Author", "John Doe"], ], "expected": { "Title": "Sales Report", "Date": "2023-01-01", "Author": "John Doe", }, }, { "rows": [ ["Report Title:", "Q1 Performance"], ["Generated on:", "2023-04-01"], ], "expected": { "Report Title": "Q1 Performance", "Generated on": "2023-04-01", }, }, { "rows": [ ["Title", "Monthly Summary"], [None, None], ["Data", "Value1", "Value2"], ], "expected": {"Title": "Monthly Summary"}, }, ] for i, case in enumerate(test_cases): result = ExcelFile.extract_metadata(case["rows"]) # type: ignore self.assertEqual(result, case["expected"], f"Test case {i + 1} failed") # type: ignore def test_read_excel_properties(self): mock_wb = MagicMock() mock_wb.properties = self.mock_properties properties = ExcelFile.read_excel_properties(mock_wb) self.assertEqual(properties["title"], "Test Title") self.assertEqual(properties["author"], "Test Creator") self.assertEqual(properties["subject"], "Test Subject") self.assertEqual(properties["description"], "Test Description") mock_wb.custom_doc_props = MagicMock() mock_custom_prop1 = MagicMock() mock_custom_prop1.name = "CustomProp1" mock_custom_prop1.value = "Value1" mock_custom_prop2 = MagicMock() mock_custom_prop2.name = "title" mock_custom_prop2.value = "Custom Title" mock_wb.custom_doc_props.props = [mock_custom_prop1, mock_custom_prop2] properties = ExcelFile.read_excel_properties(mock_wb) self.assertEqual(properties["CustomProp1"], "Value1") self.assertEqual(properties["title"], "Test Title") self.assertEqual(properties["custom.title"], "Custom Title")