Source code for genome_entropy.io.fasta

"""FASTA file reading and writing utilities."""

import gzip
from pathlib import Path
from typing import Dict, Iterator, Tuple, Union

from ..logging_config import get_logger

logger = get_logger(__name__)


[docs] def read_fasta(fasta_path: Union[str, Path]) -> Dict[str, str]: """Read a FASTA file and return a dictionary of sequence_id -> sequence. Automatically detects and handles gzipped files (ending in .gz). Args: fasta_path: Path to FASTA file (plain text or gzipped) Returns: Dictionary mapping sequence IDs to sequences Raises: FileNotFoundError: If the FASTA file doesn't exist ValueError: If the FASTA file is malformed """ fasta_path = Path(fasta_path) logger.info("Reading FASTA file: %s", fasta_path) if not fasta_path.exists(): logger.error("FASTA file not found: %s", fasta_path) raise FileNotFoundError(f"FASTA file not found: {fasta_path}") sequences = {} current_id = None current_seq_parts = [] # Auto-detect gzipped files by extension is_gzipped = str(fasta_path).endswith(".gz") open_func = gzip.open if is_gzipped else open mode = "rt" if is_gzipped else "r" with open_func(fasta_path, mode) as f: for line in f: line = line.strip() if not line: continue if line.startswith(">"): # Save previous sequence if exists if current_id is not None: sequences[current_id] = "".join(current_seq_parts) logger.debug( "Read sequence '%s' (length=%d)", current_id, len(sequences[current_id]), ) # Start new sequence current_id = line[1:].split()[0] # Take first word after > current_seq_parts = [] else: if current_id is None: logger.error("FASTA sequence found before header in %s", fasta_path) raise ValueError("FASTA sequence found before header") current_seq_parts.append(line.upper()) # Save last sequence if current_id is not None: sequences[current_id] = "".join(current_seq_parts) logger.debug( "Read sequence '%s' (length=%d)", current_id, len(sequences[current_id]) ) if not sequences: logger.error("No sequences found in FASTA file: %s", fasta_path) raise ValueError(f"No sequences found in FASTA file: {fasta_path}") logger.info("Successfully read %d sequence(s) from %s", len(sequences), fasta_path) return sequences
[docs] def read_fasta_iter(fasta_path: Union[str, Path]) -> Iterator[Tuple[str, str]]: """Read a FASTA file and yield (sequence_id, sequence) tuples. Memory-efficient iterator for large FASTA files. Automatically detects and handles gzipped files (ending in .gz). Args: fasta_path: Path to FASTA file (plain text or gzipped) Yields: Tuples of (sequence_id, sequence) Raises: FileNotFoundError: If the FASTA file doesn't exist ValueError: If the FASTA file is malformed """ fasta_path = Path(fasta_path) if not fasta_path.exists(): raise FileNotFoundError(f"FASTA file not found: {fasta_path}") current_id = None current_seq_parts = [] # Auto-detect gzipped files by extension is_gzipped = str(fasta_path).endswith(".gz") open_func = gzip.open if is_gzipped else open mode = "rt" if is_gzipped else "r" with open_func(fasta_path, mode) as f: for line in f: line = line.strip() if not line: continue if line.startswith(">"): # Yield previous sequence if exists if current_id is not None: yield (current_id, "".join(current_seq_parts)) # Start new sequence current_id = line[1:].split()[0] current_seq_parts = [] else: if current_id is None: raise ValueError("FASTA sequence found before header") current_seq_parts.append(line.upper()) # Yield last sequence if current_id is not None: yield (current_id, "".join(current_seq_parts))
[docs] def write_fasta( sequences: Dict[str, str], output_path: Union[str, Path], line_width: int = 80 ) -> None: """Write sequences to a FASTA file. Automatically compresses output if filename ends with .gz. Args: sequences: Dictionary mapping sequence IDs to sequences output_path: Path to output FASTA file (plain text or .gz for compressed) line_width: Maximum line width for sequence lines (default: 80) """ output_path = Path(output_path) logger.info("Writing %d sequence(s) to FASTA file: %s", len(sequences), output_path) output_path.parent.mkdir(parents=True, exist_ok=True) # Auto-detect gzipped output by extension is_gzipped = str(output_path).endswith(".gz") open_func = gzip.open if is_gzipped else open mode = "wt" if is_gzipped else "w" with open_func(output_path, mode) as f: for seq_id, sequence in sequences.items(): f.write(f">{seq_id}\n") # Write sequence in chunks of line_width for i in range(0, len(sequence), line_width): f.write(sequence[i : i + line_width] + "\n") logger.debug("Wrote sequence '%s' (length=%d)", seq_id, len(sequence)) logger.info("Successfully wrote FASTA file: %s", output_path)