From 59595344473ca40bf86cd5881f7c54793cda36c7 Mon Sep 17 00:00:00 2001 From: "friedemann.blume" Date: Sat, 20 Jul 2024 10:17:52 +0200 Subject: [PATCH] v3 - fix render all html content as well, use chunks and multithreading --- .DS_Store | Bin 6148 -> 6148 bytes mbox_to_markdown.py | 82 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/.DS_Store b/.DS_Store index efee48e6147d2c22037b73fd0d3e423de13fb31a..9522d7ca61bca47b7c9a4c44b0926db8194d05c4 100644 GIT binary patch delta 298 zcmZoMXfc=|#>B!ku~2NHo+6{b#(>?7iw`g}F$zrPVN#jQ&m=04lWrKCoS$33z`(%x z`?Q%0P(nf^H{Zo2DJMS(D9Evs_1eEI)nkrOi4?370tFe!#yBuc)@72ZXJg1`C}k*N zC;*X27BMiNSg;6aHi|s5oi5LKFI)lIiKzx*JuWo}m#{G~uroI>Y<6Rs$h4WAgP#K! c5}P+Ne`lV|FXG6-$iTn^G@oH}gvc6Z0C$E@0RR91 delta 102 zcmZoMXfc=|#>B)qu~2NHo+2a1#(>?7j2x32StOKFij#Aa^7C^T85kJY81flP8A=!m uKqOpZa~bPIri~2-%$wOc_&I>4Y!>AB&ODi4#E}E2hXJUOWpjkc8fE}zdKonU diff --git a/mbox_to_markdown.py b/mbox_to_markdown.py index 123d262..0e54a82 100644 --- a/mbox_to_markdown.py +++ b/mbox_to_markdown.py @@ -8,13 +8,18 @@ from markdownify import markdownify from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from tqdm import tqdm +from tempfile import mkdtemp +import shutil +from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration input_dir = '/mnt/input' output_dir = '/mnt/output' +chunk_size = 100 # Number of emails per chunk +max_workers = 4 # Number of threads # Setup logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Ensure output directory exists @@ -38,7 +43,9 @@ def format_date(date_str): def extract_email_content(email): """Extract the email content, prioritizing text/plain over text/html.""" + logger.debug("Extracting email content") if email.is_multipart(): + logger.debug("Email is multipart") for part in email.walk(): content_type = part.get_content_type() disposition = str(part.get('Content-Disposition')) @@ -47,6 +54,7 @@ def extract_email_content(email): elif content_type == 'text/html' and 'attachment' not in disposition: return markdownify(part.get_payload(decode=True).decode(errors='ignore')) else: + logger.debug("Email is not multipart") content_type = email.get_content_type() if content_type == 'text/plain': return email.get_payload(decode=True).decode(errors='ignore') @@ -85,28 +93,84 @@ def save_email_as_markdown(email, index, output_subdir): except Exception as e: logger.error(f"Error processing email {index + 1}: {e}") +def split_mbox(mbox_file): + """Split the mbox file into smaller chunks.""" + logger.debug("Splitting mbox file") + base_name = os.path.basename(mbox_file) + subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension + temp_dir = mkdtemp() + mbox = mailbox.mbox(mbox_file) + + chunks = [] + chunk_index = 0 + email_index = 0 + + current_chunk = None + chunk_path = None + + try: + for email in mbox: + if email_index % chunk_size == 0: + if current_chunk is not None: + current_chunk.close() + chunk_path = os.path.join(temp_dir, f"{subdir_name}_chunk_{chunk_index}.mbox") + current_chunk = mailbox.mbox(chunk_path, create=True) + chunks.append(chunk_path) + chunk_index += 1 + current_chunk.add(email) + email_index += 1 + if current_chunk is not None: + current_chunk.close() + except Exception as e: + logger.error(f"Error splitting mbox file: {e}") + shutil.rmtree(temp_dir) + return [] + + logger.debug(f"Created {len(chunks)} chunks") + return chunks + def convert_mbox_to_markdown(mbox_file): try: - # Create a subdirectory in the output directory with the name of the .mbox file - base_name = os.path.basename(mbox_file) + logger.debug("Converting mbox to markdown") + chunks = split_mbox(mbox_file) + if not chunks: + logger.error(f"Failed to split mbox file: {mbox_file}") + return + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks} + for future in tqdm(as_completed(futures), total=len(futures), desc="Processing chunks"): + chunk = futures[future] + try: + future.result() + logger.info(f"Completed processing chunk: {chunk}") + except Exception as e: + logger.error(f"Error processing chunk {chunk}: {e}") + except Exception as e: + logger.error(f"Error processing mbox file {mbox_file}: {e}") + +def process_chunk(chunk_file): + try: + logger.debug("Processing chunk") + base_name = os.path.basename(chunk_file) subdir_name = os.path.splitext(base_name)[0] # Remove the .mbox extension output_subdir = os.path.join(output_dir, subdir_name) os.makedirs(output_subdir, exist_ok=True) - logger.info(f"Processing .mbox file: {mbox_file}") - mbox = mailbox.mbox(mbox_file) + logger.info(f"Processing chunk file: {chunk_file}") + mbox = mailbox.mbox(chunk_file) # Show progress bar total_emails = len(mbox) - logger.info(f"Total emails to process: {total_emails}") + logger.info(f"Total emails to process in chunk: {total_emails}") - for i, email in tqdm(enumerate(mbox), total=total_emails, desc='Converting emails'): + for i, email in enumerate(mbox): logger.info(f"Processing email {i + 1}/{total_emails}") save_email_as_markdown(email, i, output_subdir) - logger.info(f"Completed processing {mbox_file}") + logger.info(f"Completed processing chunk file: {chunk_file}") except Exception as e: - logger.error(f"Error processing mbox file {mbox_file}: {e}") + logger.error(f"Error processing chunk file {chunk_file}: {e}") class MboxFileHandler(FileSystemEventHandler): def on_created(self, event):