Skip to content

Commit

Permalink
Update main.py
Browse files Browse the repository at this point in the history
  • Loading branch information
tilltmk authored Jul 28, 2024
1 parent be9969a commit f96229d
Showing 1 changed file with 108 additions and 76 deletions.
184 changes: 108 additions & 76 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import zlib
import zstandard as zstd
from tkinter import filedialog, messagebox, Tk, END
from tqdm import tqdm
from tqdm.auto import tqdm
from datetime import datetime
import threading
import io
Expand All @@ -18,7 +18,7 @@ def extract_zip_excluding(input_zip, output_dir, exclude_file, password=None, ou
members = zip_ref.namelist()
total_members = len(members)

with tqdm(total=total_members, unit='file', desc="Extracting", ncols=100, file=output_func) as progress_bar:
with tqdm(total=total_members, unit='file', desc="Extracting", ncols=70, file=output_func) as progress_bar:
for member in members:
if member != exclude_file:
try:
Expand All @@ -42,7 +42,7 @@ def compress_file(input_path, output_path, compression_level=3, output_func=None
cctx = zstd.ZstdCompressor(level=compression_level)

with open(input_path, 'rb') as input_file, open(output_path, 'wb') as output_file:
with tqdm(total=file_size, unit='B', unit_scale=True, desc='Compressing', ncols=100, file=output_func) as pbar:
with tqdm(total=file_size, unit='B', unit_scale=True, desc='Compressing', ncols=70, file=output_func) as pbar:
def update_progress(chunk):
pbar.update(len(chunk))

Expand All @@ -62,7 +62,7 @@ def compress_folder(input_folder, output_path, compression_level=3, output_func=
with open(output_path, 'wb') as output_file:
cctx = zstd.ZstdCompressor(level=compression_level)
with cctx.stream_writer(output_file) as compressor:
with tqdm(total=total_files, unit='file', desc='Compressing', ncols=100, file=output_func) as pbar:
with tqdm(total=total_files, unit='file', desc='Compressing', ncols=70, file=output_func) as pbar:
files_processed = 0

for root, _, files in os.walk(input_folder):
Expand All @@ -83,15 +83,26 @@ class TextRedirector(io.StringIO):
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
self.last_line = ""

def write(self, s):
self.text_widget.insert(END, s)
# Auto-scroll to the end
if '\r' in s:
self.text_widget.delete("end-1c linestart", "end")
self.last_line = s.split('\r')[-1]
else:
self.last_line += s

if '\n' in s:
self.text_widget.insert(END, self.last_line)
self.last_line = ""

self.text_widget.see(END)

def flush(self):
pass

if self.last_line:
self.text_widget.insert(END, self.last_line)
self.last_line = ""
self.text_widget.see(END)

# GUI classes and functions
class App(ctk.CTk):
Expand All @@ -114,11 +125,19 @@ def create_widgets(self):
self.compress_tab = self.tabview.add("Compress")
self.decode_zst_tab = self.tabview.add("Decode Zstd")

# Extract Tab
self.create_extract_tab()
self.create_compress_tab()
self.create_decode_zst_tab()

# Quit Button
self.quit_button = ctk.CTkButton(self, text="Exit", command=self.quit_program)
self.quit_button.pack(pady=10)

def create_extract_tab(self):
self.extract_frame = ctk.CTkFrame(self.extract_tab)
self.extract_frame.pack(fill="both", expand=True, padx=20, pady=20)

self.extract_zip_button = ctk.CTkButton(self.extract_frame, text="Retrieve Zip-File", command=self.extract_zip)
self.extract_zip_button = ctk.CTkButton(self.extract_frame, text="Retrieve Zip-File", command=self.select_zip_file)
self.extract_zip_button.pack(pady=10)

self.exclude_file_entry = ctk.CTkEntry(self.extract_frame, placeholder_text="Exclude File")
Expand All @@ -127,14 +146,12 @@ def create_widgets(self):
self.password_entry = ctk.CTkEntry(self.extract_frame, placeholder_text="Password", show="*")
self.password_entry.pack(pady=10)

# Terminal Frame
self.terminal_frame = ctk.CTkFrame(self.extract_frame)
self.terminal_frame.pack(fill="both", expand=True, padx=10, pady=10)
self.extract_start_button = ctk.CTkButton(self.extract_frame, text="Start Extraction", command=self.start_extraction)
self.extract_start_button.pack(pady=10)

self.terminal_output = ctk.CTkTextbox(self.terminal_frame, font=("Courier", 12))
self.terminal_output.pack(fill="both", expand=True)
self.create_terminal(self.extract_frame)

# Compress Tab
def create_compress_tab(self):
self.compress_frame = ctk.CTkFrame(self.compress_tab)
self.compress_frame.pack(fill="both", expand=True, padx=20, pady=20)

Expand All @@ -151,7 +168,12 @@ def create_widgets(self):
self.output_path_entry = ctk.CTkEntry(self.compress_frame, placeholder_text="Output Path")
self.output_path_entry.pack(pady=10)

# Decode Zstd Tab
self.compress_start_button = ctk.CTkButton(self.compress_frame, text="Start Compression", command=self.start_compression)
self.compress_start_button.pack(pady=10)

self.create_terminal(self.compress_frame)

def create_decode_zst_tab(self):
self.decode_zst_frame = ctk.CTkFrame(self.decode_zst_tab)
self.decode_zst_frame.pack(fill="both", expand=True, padx=20, pady=20)

Expand All @@ -160,97 +182,107 @@ def create_widgets(self):

self.decode_output_path_entry = ctk.CTkEntry(self.decode_zst_frame, placeholder_text="Output Path")
self.decode_output_path_entry.pack(pady=10)

# Quit Button
self.quit_button = ctk.CTkButton(self, text="Exit", command=self.quit_program)
self.quit_button.pack(pady=10)

def extract_zip(self):
input_zip = filedialog.askopenfilename(title="Select Zip File", filetypes=[("Zip files", "*.zip")])
if not input_zip:
return

output_dir = filedialog.askdirectory(title="Select Output Directory")
if not output_dir:
self.decode_start_button = ctk.CTkButton(self.decode_zst_frame, text="Start Decoding", command=self.start_decoding)
self.decode_start_button.pack(pady=10)

self.create_terminal(self.decode_zst_frame)

def create_terminal(self, parent):
terminal_frame = ctk.CTkFrame(parent)
terminal_frame.pack(fill="both", expand=True, padx=10, pady=10)

self.terminal_output = ctk.CTkTextbox(terminal_frame, font=("Courier", 12))
self.terminal_output.pack(fill="both", expand=True)

def clear_terminal(self):
self.terminal_output.delete("1.0", END)

def select_zip_file(self):
self.input_zip = filedialog.askopenfilename(title="Select Zip File", filetypes=[("Zip files", "*.zip")])
if self.input_zip:
self.output_dir = filedialog.askdirectory(title="Select Output Directory")

def start_extraction(self):
if not hasattr(self, 'input_zip') or not hasattr(self, 'output_dir'):
messagebox.showerror("Error", "Please select a zip file and output directory first.")
return

exclude_file = self.exclude_file_entry.get()
password = self.password_entry.get() or None

start_time = datetime.now()

thread = threading.Thread(target=lambda: self.run_extract_zip(input_zip, output_dir, exclude_file, password, start_time))
thread = threading.Thread(target=lambda: self.run_extract_zip(self.input_zip, self.output_dir, exclude_file, password, start_time))
thread.start()

def run_extract_zip(self, input_zip, output_dir, exclude_file, password, start_time):
success = extract_zip_excluding(input_zip, output_dir, exclude_file, password, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
if success:
messagebox.showinfo("Success", f"Extraction completed successfully in {duration}!")
else:
messagebox.showerror("Error", "Extraction failed.")

def select_compress_file(self):
input_path = filedialog.askopenfilename(title="Select File to Compress", filetypes=[("All files", "*.*")])
if not input_path:
return

output_path = filedialog.asksaveasfilename(title="Save Compressed File As", defaultextension=".zst", filetypes=[("Zstandard compressed files", "*.zst")])
if not output_path:
self.input_path = filedialog.askopenfilename(title="Select File to Compress", filetypes=[("All files", "*.*")])
if self.input_path:
self.output_path = filedialog.asksaveasfilename(title="Save Compressed File As", defaultextension=".zst", filetypes=[("Zstandard compressed files", "*.zst")])

def select_compress_folder(self):
self.input_path = filedialog.askdirectory(title="Select Folder to Compress")
if self.input_path:
self.output_path = filedialog.asksaveasfilename(title="Save Compressed File As", defaultextension=".zst", filetypes=[("Zstandard compressed files", "*.zst")])

def start_compression(self):
if not hasattr(self, 'input_path') or not hasattr(self, 'output_path'):
messagebox.showerror("Error", "Please select an input file/folder and output path first.")
return

compression_level = int(self.compression_level_entry.get())

start_time = datetime.now()

thread = threading.Thread(target=lambda: self.run_compress_file(input_path, output_path, compression_level, start_time))
if os.path.isfile(self.input_path):
thread = threading.Thread(target=lambda: self.run_compress_file(self.input_path, self.output_path, compression_level, start_time))
else:
thread = threading.Thread(target=lambda: self.run_compress_folder(self.input_path, self.output_path, compression_level, start_time))
thread.start()

def run_compress_file(self, input_path, output_path, compression_level, start_time):
compress_file(input_path, output_path, compression_level, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
messagebox.showinfo("Success", f"Compression completed successfully in {duration}!")
def select_zst_file(self):
self.input_path = filedialog.askopenfilename(title="Select Zstandard File", filetypes=[("Zstandard files", "*.zst")])
if self.input_path:
self.output_path = filedialog.asksaveasfilename(title="Select Output File", defaultextension="", filetypes=[("All files", "*.*")])

def select_compress_folder(self):
input_folder = filedialog.askdirectory(title="Select Folder to Compress")
if not input_folder:
def start_decoding(self):
if not hasattr(self, 'input_path') or not hasattr(self, 'output_path'):
messagebox.showerror("Error", "Please select an input file and output path first.")
return

output_path = filedialog.asksaveasfilename(title="Save Compressed File As", defaultextension=".zst", filetypes=[("Zstandard compressed files", "*.zst")])
if not output_path:
return

compression_level = int(self.compression_level_entry.get())


start_time = datetime.now()

thread = threading.Thread(target=lambda: self.run_compress_folder(input_folder, output_path, compression_level, start_time))
thread = threading.Thread(target=lambda: self.run_extract_zst(self.input_path, self.output_path, start_time))
thread.start()

def run_compress_folder(self, input_folder, output_path, compression_level, start_time):
self.clear_terminal()
compress_folder(input_folder, output_path, compression_level, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
messagebox.showinfo("Success", f"Compression completed successfully in {duration}!")

def select_zst_file(self):
input_path = filedialog.askopenfilename(title="Select Zstandard File", filetypes=[("Zstandard files", "*.zst")])
if not input_path:
return

output_path = filedialog.asksaveasfilename(title="Select Output File", defaultextension="", filetypes=[("All files", "*.*")])
if not output_path:
return

start_time = datetime.now()

thread = threading.Thread(target=lambda: self.run_extract_zst(input_path, output_path, start_time))
thread.start()

def run_extract_zst(self, input_path, output_path, start_time):
self.clear_terminal()
extract_zst(input_path, output_path, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
messagebox.showinfo("Success", f"Extraction completed successfully in {duration}!")

def run_compress_file(self, input_path, output_path, compression_level, start_time):
self.clear_terminal()
compress_file(input_path, output_path, compression_level, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
messagebox.showinfo("Success", f"Compression completed successfully in {duration}!")

def run_extract_zip(self, input_zip, output_dir, exclude_file, password, start_time):
self.clear_terminal()
success = extract_zip_excluding(input_zip, output_dir, exclude_file, password, output_func=TextRedirector(self.terminal_output))
duration = datetime.now() - start_time
if success:
messagebox.showinfo("Success", f"Extraction completed successfully in {duration}!")
else:
messagebox.showerror("Error", "Extraction failed.")

def quit_program(self):
self.quit()
self.destroy()
Expand Down

0 comments on commit f96229d

Please sign in to comment.