"""JSON serialization for data models."""
import gzip
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any, Union
from ..logging_config import get_logger
logger = get_logger(__name__)
# Schema version for tracking output format changes
SCHEMA_VERSION = "2.0.0"
[docs]
def to_json_dict(obj: Any) -> Any:
"""Convert a dataclass object to a JSON-serializable dictionary.
Recursively handles nested dataclasses, lists, and dictionaries.
Args:
obj: Object to convert (typically a dataclass instance)
Returns:
JSON-serializable dictionary
"""
if is_dataclass(obj):
return {k: to_json_dict(v) for k, v in asdict(obj).items()}
elif isinstance(obj, list):
return [to_json_dict(item) for item in obj]
elif isinstance(obj, dict):
return {k: to_json_dict(v) for k, v in obj.items()}
else:
return obj
[docs]
def convert_pipeline_result_to_unified(pipeline_result):
"""Convert PipelineResult to UnifiedPipelineResult format.
This function transforms the old redundant format (separate orfs, proteins,
three_dis lists) into the new unified format where each feature appears
exactly once with all its related data organized hierarchically.
OLD FORMAT PROBLEM:
-------------------
The old format had three parallel lists:
- orfs: [ORF1, ORF2, ...]
- proteins: [{orf: ORF1, aa_seq: ...}, {orf: ORF2, aa_seq: ...}, ...]
- three_dis: [{protein: {orf: ORF1, ...}, 3di: ...}, ...]
This caused:
1. ORF data duplicated 3 times (in orfs, inside proteins, inside three_dis)
2. Protein data duplicated 2 times (in proteins, inside three_dis)
3. ~2-3x larger files due to redundancy
4. Risk of inconsistency if data differs between copies
NEW UNIFIED FORMAT:
-------------------
Single features dictionary with hierarchical organization:
- features: {
"orf_1": {
location: {start, end, strand, frame},
dna: {sequence, length},
protein: {sequence, length},
three_di: {encoding, length, method, model, device},
metadata: {parent_id, table_id, has_start, has_stop, in_genbank},
entropy: {dna_entropy, protein_entropy, three_di_entropy}
}
}
Benefits:
1. Each piece of information stored exactly once
2. 40-50% smaller file sizes
3. Direct O(1) access by orf_id
4. Clear hierarchical organization matching biological concepts
5. Single source of truth - no inconsistency possible
Args:
pipeline_result: PipelineResult object or list of PipelineResult objects
Returns:
UnifiedPipelineResult object or list of UnifiedPipelineResult objects
"""
# Import here to avoid circular imports
from ..pipeline.types import (
UnifiedPipelineResult,
UnifiedFeature,
FeatureLocation,
FeatureDNA,
FeatureProtein,
FeatureThreeDi,
FeatureMetadata,
FeatureEntropy,
)
# Handle both single result and list of results
if isinstance(pipeline_result, list):
return [convert_pipeline_result_to_unified(r) for r in pipeline_result]
# Extract the PipelineResult object
result = pipeline_result
# Build a dictionary of features by orf_id
# This replaces the three separate lists (orfs, proteins, three_dis)
# with a single unified structure where each ORF appears exactly once
features = {}
# Create lookup dictionaries for efficient O(1) access
# Maps: orf_id -> ProteinRecord
proteins_by_orf_id = {p.orf.orf_id: p for p in result.proteins}
# Maps: orf_id -> ThreeDiRecord
three_dis_by_orf_id = {td.protein.orf.orf_id: td for td in result.three_dis}
# Process each ORF and merge data from all three sources
for orf in result.orfs:
orf_id = orf.orf_id
# Get corresponding protein and 3Di records
# These contain the ORF data nested inside them (redundancy!)
protein = proteins_by_orf_id.get(orf_id)
three_di_record = three_dis_by_orf_id.get(orf_id)
# Validate that we have all the data
# (Should always be true unless pipeline failed partially)
if protein is None:
logger.warning(f"No protein found for ORF {orf_id}, skipping")
continue
if three_di_record is None:
logger.warning(f"No 3Di encoding found for ORF {orf_id}, skipping")
continue
# Extract entropy values for this feature from the entropy report
# OLD: entropy had separate dicts for orf_nt, protein_aa, three_di
# NEW: we consolidate these into a single entropy sub-object per feature
dna_entropy = result.entropy.orf_nt_entropy.get(orf_id, 0.0)
protein_entropy = result.entropy.protein_aa_entropy.get(orf_id, 0.0)
three_di_entropy = result.entropy.three_di_entropy.get(orf_id, 0.0)
# Build the unified feature
# Instead of storing the ORF object three times, we extract each
# piece of information once and organize it hierarchically
unified_feature = UnifiedFeature(
orf_id=orf_id,
# Genomic location (from ORF)
location=FeatureLocation(
start=orf.start,
end=orf.end,
strand=orf.strand,
frame=orf.frame,
),
# DNA sequence (from ORF) - stored once, not three times
dna=FeatureDNA(
nt_sequence=orf.nt_sequence,
length=len(orf.nt_sequence),
),
# Protein sequence (from ProteinRecord) - stored once, not twice
protein=FeatureProtein(
aa_sequence=protein.aa_sequence,
length=protein.aa_length,
),
# 3Di encoding (from ThreeDiRecord) - stored once
three_di=FeatureThreeDi(
encoding=three_di_record.three_di,
length=len(three_di_record.three_di),
method=three_di_record.method,
model_name=three_di_record.model_name,
inference_device=three_di_record.inference_device,
),
# Metadata (from ORF) - organized separately for clarity
metadata=FeatureMetadata(
parent_id=orf.parent_id,
table_id=orf.table_id,
has_start_codon=orf.has_start_codon,
has_stop_codon=orf.has_stop_codon,
in_genbank=orf.in_genbank,
),
# Entropy values (from EntropyReport) - consolidated per feature
entropy=FeatureEntropy(
dna_entropy=dna_entropy,
protein_entropy=protein_entropy,
three_di_entropy=three_di_entropy,
),
)
# Store in dictionary keyed by orf_id
# This enables O(1) lookup instead of O(n) list search
features[orf_id] = unified_feature
# Validate that no features were lost during conversion
expected_count = len(result.orfs)
actual_count = len(features)
if actual_count != expected_count:
logger.warning(
f"Feature count mismatch: expected {expected_count}, got {actual_count}"
)
# Create the unified result with schema version for compatibility tracking
unified_result = UnifiedPipelineResult(
schema_version=SCHEMA_VERSION, # v2.0.0 for new unified format
input_id=result.input_id,
input_dna_length=result.input_dna_length,
dna_entropy_global=result.entropy.dna_entropy_global,
alphabet_sizes=result.entropy.alphabet_sizes,
features=features, # Single unified dictionary replaces three lists
)
return unified_result
[docs]
def write_json(data: Any, output_path: Union[str, Path], indent: int = 2) -> None:
"""Write data to a JSON file.
Automatically handles dataclass objects by converting them to dictionaries.
If data contains PipelineResult objects, they are automatically converted
to the new unified format to eliminate redundancy.
Automatically compresses output if filename ends with .gz.
AUTOMATIC CONVERSION:
---------------------
This function transparently converts old-format PipelineResult objects to
the new unified format. This means:
1. Users don't need to manually call convert_pipeline_result_to_unified()
2. All JSON output from the pipeline automatically uses the new format
3. The conversion happens only once during serialization
4. No changes needed to pipeline code or user scripts
MAPPING: Old Keys → New Structure
----------------------------------
OLD FORMAT:
- orfs[i].orf_id → features[orf_id].orf_id
- orfs[i].start → features[orf_id].location.start
- orfs[i].nt_sequence → features[orf_id].dna.nt_sequence
- proteins[i].aa_sequence → features[orf_id].protein.aa_sequence
- three_dis[i].three_di → features[orf_id].three_di.encoding
- entropy.orf_nt_entropy[id] → features[id].entropy.dna_entropy
NEW FORMAT adds:
- schema_version: "2.0.0" (for compatibility tracking)
- features: dict (replaces orfs, proteins, three_dis lists)
- Hierarchical organization (location, dna, protein, three_di, metadata, entropy)
Args:
data: Data to write (dataclass, dict, list, etc.)
output_path: Path to output JSON file (plain text or .gz for compressed)
indent: Indentation level for pretty printing (default: 2)
"""
output_path = Path(output_path)
logger.info("Writing JSON data to: %s", output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Convert PipelineResult to unified format if needed
# This conversion happens transparently to ensure all JSON output
# uses the new redundancy-free format
# Import here to avoid circular imports
try:
from ..pipeline.runner import PipelineResult
if isinstance(data, (PipelineResult, list)):
# Check if we have a list of PipelineResult objects
if (
isinstance(data, list)
and len(data) > 0
and isinstance(data[0], PipelineResult)
):
logger.info("Converting PipelineResult to unified format")
data = convert_pipeline_result_to_unified(data)
# Check if we have a single PipelineResult object
elif isinstance(data, PipelineResult):
logger.info("Converting PipelineResult to unified format")
data = convert_pipeline_result_to_unified(data)
except ImportError:
# PipelineResult not available, skip conversion
# (e.g., when writing non-pipeline data)
pass
# Convert dataclasses to dictionaries recursively
json_data = to_json_dict(data)
# Auto-detect gzipped output by extension
is_gzipped = str(output_path).endswith(".gz")
open_func = gzip.open if is_gzipped else open
mode = "wt" if is_gzipped else "w"
# Write to file with pretty printing
with open_func(output_path, mode, encoding="utf-8") as f:
json.dump(json_data, f, indent=indent)
logger.info("Successfully wrote JSON file: %s", output_path)
[docs]
def read_json(input_path: Union[str, Path]) -> Any:
"""Read JSON data from a file.
Automatically detects and handles gzipped files (ending in .gz).
Args:
input_path: Path to input JSON file (plain text or gzipped)
Returns:
Parsed JSON data (dict, list, etc.)
Raises:
FileNotFoundError: If the JSON file doesn't exist
json.JSONDecodeError: If the file contains invalid JSON
"""
input_path = Path(input_path)
logger.info("Reading JSON file: %s", input_path)
if not input_path.exists():
logger.error("JSON file not found: %s", input_path)
raise FileNotFoundError(f"JSON file not found: {input_path}")
# Auto-detect gzipped files by extension
is_gzipped = str(input_path).endswith(".gz")
open_func = gzip.open if is_gzipped else open
mode = "rt" if is_gzipped else "r"
with open_func(input_path, mode, encoding="utf-8") as f:
data = json.load(f)
logger.info("Successfully read JSON file: %s", input_path)
return data