-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a58f2b2
commit 3e1bec3
Showing
6 changed files
with
88 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
This is not a valid zip file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,10 @@ | ||
""" | ||
Functions to ingest and process data | ||
Modularized Data pipeline to form DAGs in the future | ||
""" | ||
import zipfile | ||
|
||
from .download_data import ingest_data | ||
|
||
|
||
def unzip_file(): | ||
""" | ||
Function to unzip the downloaded data | ||
""" | ||
zip_filename ='data/data.zip' | ||
extract_to = 'data/' | ||
try: | ||
with zipfile.ZipFile(zip_filename, 'r') as zip_ref: | ||
zip_ref.extractall(extract_to) | ||
print(f"File {zip_filename} successfully unzipped to {extract_to}") | ||
except zipfile.BadZipFile: | ||
print(f"Failed to unzip {zip_filename}") | ||
from .unzip_data import unzip_file | ||
|
||
|
||
if __name__ == "__main__": | ||
ZIPFILE_PATH = ingest_data("https://archive.ics.uci.edu/static/public/352/online+retail.zip") | ||
unzip_file() | ||
UNZIPPED_FILE = unzip_file(ZIPFILE_PATH, 'data/') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
""" | ||
Function to unzip data and make it available | ||
""" | ||
import zipfile | ||
|
||
ZIP_FILENAME ='data/data.zip' | ||
EXTRACT_TO = 'data/' | ||
|
||
def unzip_file(zip_filename=ZIP_FILENAME, extract_to=EXTRACT_TO): | ||
""" | ||
Function to unzip the downloaded data | ||
Args: | ||
zip_filename: zipfile path, a default is used if not specified | ||
extract_to: Path where the unzipped and extracted data is available | ||
Returns: | ||
extract_to: filepath where the data is available | ||
""" | ||
try: | ||
with zipfile.ZipFile(zip_filename, 'r') as zip_ref: | ||
zip_ref.extractall(extract_to) | ||
print(f"File {zip_filename} successfully unzipped to {extract_to}") | ||
except zipfile.BadZipFile: | ||
print(f"Failed to unzip {zip_filename}") | ||
return extract_to | ||
|
||
if __name__ == "__main__": | ||
UNZIPPED_FILE = unzip_file(ZIP_FILENAME, EXTRACT_TO) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,3 @@ | ||
""" | ||
Tests for datapipeline functions | ||
""" | ||
from src import datapipeline | ||
|
||
def test_unzip_file(mocker): | ||
""" | ||
Tests for unzip() | ||
""" | ||
|
||
# arrange: | ||
# mocked dependencies | ||
|
||
mock_zipfile = mocker.MagicMock(name='ZipFile') | ||
mocker.patch('src.datapipeline.zipfile.ZipFile', new=mock_zipfile) | ||
|
||
mock_print = mocker.MagicMock(name='print') | ||
mocker.patch('src.datapipeline.print', new=mock_print) | ||
|
||
mock_exception = mocker.MagicMock(name='Exception') | ||
mocker.patch('src.datapipeline.Exception', new=mock_exception) | ||
|
||
# act: invoking the tested code | ||
datapipeline.unzip_file() | ||
|
||
# assert: | ||
mock_exception.assert_not_called() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
""" | ||
Function to test the unzip_data functions | ||
""" | ||
from src import unzip_data | ||
|
||
# Define constants or variables for testing | ||
ZIP_FILENAME = 'data/data.zip' | ||
EXTRACT_TO = 'data/' | ||
BAD_ZIP_FILENAME = 'data/bad.zip' | ||
|
||
def test_unzip_file_successful(tmp_path): | ||
""" | ||
Test for successful unzipping | ||
""" | ||
# Create a temporary directory for testing | ||
test_dir = tmp_path / "test_dir" | ||
test_dir.mkdir() | ||
|
||
# Copy the sample zip file to the temporary directory | ||
zip_file = tmp_path / "data.zip" | ||
with open(ZIP_FILENAME, "rb") as src, open(zip_file, "wb") as dst: | ||
dst.write(src.read()) | ||
|
||
# Call the function | ||
result = unzip_data.unzip_file(zip_file, test_dir) | ||
|
||
print(result) | ||
|
||
# Check if the function returned the expected extract_to path | ||
assert result == test_dir | ||
|
||
|
||
def test_unzip_file_bad_zip(tmp_path, capsys): | ||
""" | ||
Test for handling a bad zip file | ||
""" | ||
# Create a temporary directory for testing | ||
test_dir = tmp_path / "test_dir" | ||
test_dir.mkdir() | ||
|
||
# Create a bad zip file in the temporary directory | ||
with open(BAD_ZIP_FILENAME, "wb") as file: | ||
file.write(b"This is not a valid zip file") | ||
|
||
# Call the function with the bad zip file | ||
result = unzip_data.unzip_file(BAD_ZIP_FILENAME, test_dir) | ||
|
||
# Check if the function returned the extract_to path | ||
assert result == test_dir | ||
|
||
# Check if the function printed the appropriate error message | ||
captured = capsys.readouterr() | ||
assert "Failed to unzip" in captured.out |