Source code for varannote.tools.annotator

#!/usr/bin/env python3
"""
Variant Annotator Tool - Main annotation interface
"""

import sys
import json
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional
from tqdm import tqdm

from ..utils.vcf_parser import VCFParser
from ..utils.annotation_db import AnnotationDatabase
from ..utils.real_annotation_db import RealAnnotationDatabase
from ..core.annotator import VariantAnnotator

[docs] class VariantAnnotatorTool: """ Enhanced variant annotation tool Provides comprehensive variant annotation using either mock or real databases with advanced features like parallel processing and confidence scoring. """
[docs] def __init__(self, genome: str = "hg38", databases: Optional[List[str]] = None, verbose: bool = False, use_real_db: bool = False, cache_dir: Optional[str] = None, use_parallel: bool = False, max_workers: int = 4): """ Initialize enhanced variant annotator tool Args: genome: Reference genome version databases: List of databases to use verbose: Enable verbose output use_real_db: Use real databases instead of mock data cache_dir: Cache directory for real database results use_parallel: Enable parallel processing max_workers: Maximum number of parallel workers """ self.genome = genome self.databases = databases or ["clinvar", "gnomad", "dbsnp", "cosmic", "omim", "pharmgkb", "clingen", "hgmd", "ensembl"] self.verbose = verbose self.use_real_db = use_real_db self.cache_dir = cache_dir self.use_parallel = use_parallel self.max_workers = max_workers # Initialize components self.vcf_parser = VCFParser() self.functional_annotator = VariantAnnotator(genome=genome) # Initialize database (mock or real) if use_real_db: if verbose: print("🔗 Initializing enhanced real database connections...") self.annotation_db = RealAnnotationDatabase( genome=genome, cache_dir=cache_dir, use_cache=True ) else: if verbose: print("🎭 Using mock databases for testing...") self.annotation_db = AnnotationDatabase(genome=genome)
[docs] def annotate_file(self, input_file: str, output_file: str, output_format: str = "vcf") -> Dict: """ Annotate variants from a VCF file with enhanced features Args: input_file: Input VCF file path output_file: Output file path output_format: Output format (vcf, tsv, json) Returns: Dictionary with annotation statistics including confidence scores """ if self.verbose: print(f"🔧 Initialized Enhanced VariantAnnotator with genome: {self.genome}") print(f"⚡ Parallel processing: {'enabled' if self.use_parallel else 'disabled'}") # Parse VCF file if self.verbose: print(f"📖 Reading variants from {input_file}") variants = self.vcf_parser.parse_file(input_file) if self.verbose: print(f"🔍 Found {len(variants)} variants to annotate") # Annotate variants annotated_variants = self._annotate_variants(variants) # Calculate confidence statistics confidence_stats = self._calculate_confidence_statistics(annotated_variants) # Save results self._save_results(annotated_variants, output_file, output_format) result = { 'variants_processed': len(annotated_variants), 'output_file': output_file, 'output_format': output_format, 'confidence_stats': confidence_stats } return result
def _annotate_variants(self, variants: List[Dict]) -> List[Dict]: """Annotate a list of variants with enhanced processing""" if self.use_real_db and self.use_parallel and len(variants) > 1: # Use enhanced batch annotation with parallel processing return self.annotation_db.batch_annotate( variants, databases=self.databases, max_workers=self.max_workers, use_parallel=True ) else: # Use sequential processing return self._annotate_variants_sequential(variants) def _annotate_variants_sequential(self, variants: List[Dict]) -> List[Dict]: """Sequential variant annotation""" annotated_variants = [] # Progress bar progress_bar = tqdm( variants, desc="Annotating variants", disable=not self.verbose ) for variant in progress_bar: try: # Get functional annotations functional_annotations = self.functional_annotator.get_functional_annotations(variant) # Get database annotations if self.use_real_db: # Use real databases db_annotations = self.annotation_db.get_annotations(variant, "all") else: # Use mock databases db_annotations = self.annotation_db.get_annotations(variant, "all") # Combine all annotations annotated_variant = { **variant, **functional_annotations, **db_annotations } annotated_variants.append(annotated_variant) except Exception as e: if self.verbose: print(f"Warning: Failed to annotate variant {variant.get('variant_id', 'unknown')}: {e}") # Add variant with minimal annotation annotated_variant = { **variant, 'annotation_error': str(e) } annotated_variants.append(annotated_variant) return annotated_variants def _calculate_confidence_statistics(self, variants: List[Dict]) -> Dict: """Calculate confidence score statistics""" confidence_scores = [] high_confidence_count = 0 for variant in variants: confidence = variant.get('annotation_confidence', 0.0) if isinstance(confidence, (int, float)): confidence_scores.append(confidence) if confidence >= 0.7: # High confidence threshold high_confidence_count += 1 if not confidence_scores: return { 'average': 0.0, 'median': 0.0, 'high_confidence_count': 0, 'total_variants': len(variants) } return { 'average': sum(confidence_scores) / len(confidence_scores), 'median': sorted(confidence_scores)[len(confidence_scores) // 2], 'high_confidence_count': high_confidence_count, 'total_variants': len(variants), 'min_confidence': min(confidence_scores), 'max_confidence': max(confidence_scores) } def _save_results(self, variants: List[Dict], output_file: str, output_format: str): """Save annotated variants to file with enhanced fields""" output_path = Path(output_file) if output_format == "vcf": self._save_vcf(variants, output_path) elif output_format == "tsv": self._save_tsv(variants, output_path) elif output_format == "json": self._save_json(variants, output_path) else: raise ValueError(f"Unsupported output format: {output_format}") def _save_vcf(self, variants: List[Dict], output_path: Path): """Save variants in VCF format with enhanced annotations in INFO field""" with open(output_path, 'w') as f: # Write VCF header f.write("##fileformat=VCFv4.2\n") f.write(f"##reference={self.genome}\n") # Add INFO field definitions for annotations info_fields = [ "##INFO=<ID=GENE,Number=1,Type=String,Description=\"Gene symbol\">", "##INFO=<ID=CONSEQUENCE,Number=1,Type=String,Description=\"Variant consequence\">", "##INFO=<ID=CADD,Number=1,Type=Float,Description=\"CADD pathogenicity score\">", "##INFO=<ID=CLINVAR_SIG,Number=1,Type=String,Description=\"ClinVar significance\">", "##INFO=<ID=GNOMAD_AF,Number=1,Type=Float,Description=\"gnomAD allele frequency\">", "##INFO=<ID=DBSNP_ID,Number=1,Type=String,Description=\"dbSNP identifier\">", "##INFO=<ID=COSMIC_ID,Number=1,Type=String,Description=\"COSMIC identifier\">", "##INFO=<ID=OMIM_DISEASES,Number=1,Type=String,Description=\"OMIM disease associations\">", "##INFO=<ID=PHARMGKB_DRUGS,Number=1,Type=String,Description=\"PharmGKB drug interactions\">", "##INFO=<ID=CLINGEN_VALIDITY,Number=1,Type=String,Description=\"ClinGen gene-disease validity\">", "##INFO=<ID=CLINGEN_DISEASES,Number=1,Type=String,Description=\"ClinGen disease associations\">", "##INFO=<ID=HGMD_ID,Number=1,Type=String,Description=\"HGMD mutation identifier\">", "##INFO=<ID=HGMD_PATHOGENICITY,Number=1,Type=String,Description=\"HGMD pathogenicity classification\">", "##INFO=<ID=ENSEMBL_CONSEQUENCE,Number=1,Type=String,Description=\"Ensembl VEP consequence\">", "##INFO=<ID=ENSEMBL_IMPACT,Number=1,Type=String,Description=\"Ensembl impact prediction\">", "##INFO=<ID=CONFIDENCE,Number=1,Type=Float,Description=\"Annotation confidence score\">" ] for info_field in info_fields: f.write(f"{info_field}\n") # Write column header f.write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n") # Write variants for variant in variants: info_parts = [] # Add annotations to INFO field if variant.get('gene_symbol'): info_parts.append(f"GENE={variant['gene_symbol']}") if variant.get('consequence'): info_parts.append(f"CONSEQUENCE={variant['consequence']}") if variant.get('cadd_score'): info_parts.append(f"CADD={variant['cadd_score']}") if variant.get('clinvar_significance'): info_parts.append(f"CLINVAR_SIG={variant['clinvar_significance']}") if variant.get('gnomad_af'): info_parts.append(f"GNOMAD_AF={variant['gnomad_af']}") if variant.get('dbsnp_id'): info_parts.append(f"DBSNP_ID={variant['dbsnp_id']}") if variant.get('cosmic_id'): info_parts.append(f"COSMIC_ID={variant['cosmic_id']}") if variant.get('omim_diseases'): # Escape semicolons in OMIM diseases diseases = str(variant['omim_diseases']).replace(';', '%3B') info_parts.append(f"OMIM_DISEASES={diseases}") if variant.get('pharmgkb_drugs'): # Escape semicolons in PharmGKB drugs drugs = str(variant['pharmgkb_drugs']).replace(';', '%3B') info_parts.append(f"PHARMGKB_DRUGS={drugs}") if variant.get('clingen_validity'): info_parts.append(f"CLINGEN_VALIDITY={variant['clingen_validity']}") if variant.get('clingen_diseases'): info_parts.append(f"CLINGEN_DISEASES={variant['clingen_diseases']}") if variant.get('hgmd_id'): info_parts.append(f"HGMD_ID={variant['hgmd_id']}") if variant.get('hgmd_pathogenicity'): info_parts.append(f"HGMD_PATHOGENICITY={variant['hgmd_pathogenicity']}") if variant.get('ensembl_consequence'): info_parts.append(f"ENSEMBL_CONSEQUENCE={variant['ensembl_consequence']}") if variant.get('ensembl_impact'): info_parts.append(f"ENSEMBL_IMPACT={variant['ensembl_impact']}") if variant.get('annotation_confidence'): info_parts.append(f"CONFIDENCE={variant['annotation_confidence']}") info_str = ";".join(info_parts) if info_parts else "." # Write variant line f.write(f"{variant['CHROM']}\t{variant['POS']}\t{variant.get('ID', '.')}\t" f"{variant['REF']}\t{variant['ALT']}\t{variant.get('QUAL', '.')}\t" f"{variant.get('FILTER', '.')}\t{info_str}\n") def _save_tsv(self, variants: List[Dict], output_path: Path): """Save variants in TSV format with enhanced columns""" # Convert to DataFrame df = pd.DataFrame(variants) # Select key columns for TSV output key_columns = [ 'CHROM', 'POS', 'REF', 'ALT', 'variant_id', 'gene_symbol', 'consequence', 'cadd_score', 'clinvar_significance', 'gnomad_af', 'dbsnp_id', 'cosmic_id', 'omim_diseases', 'omim_inheritance', 'pharmgkb_drugs', 'pharmgkb_level', 'clingen_validity', 'clingen_diseases', 'clingen_dosage_hi', 'clingen_evidence', 'hgmd_id', 'hgmd_pathogenicity', 'hgmd_disease', 'hgmd_evidence', 'ensembl_consequence', 'ensembl_impact', 'ensembl_transcript', 'ensembl_sift', 'ensembl_polyphen', 'annotation_confidence' ] # Keep only columns that exist available_columns = [col for col in key_columns if col in df.columns] df_output = df[available_columns] # Save to TSV df_output.to_csv(output_path, sep='\t', index=False) def _save_json(self, variants: List[Dict], output_path: Path): """Save variants in JSON format""" with open(output_path, 'w') as f: json.dump(variants, f, indent=2, default=str)
[docs] def get_annotation_summary(self, variants: List[Dict]) -> Dict: """Get enhanced summary statistics for annotations""" total_variants = len(variants) # Count annotations with_gene = sum(1 for v in variants if v.get('gene_symbol')) with_clinvar = sum(1 for v in variants if v.get('clinvar_significance')) with_gnomad = sum(1 for v in variants if v.get('gnomad_af')) with_dbsnp = sum(1 for v in variants if v.get('dbsnp_id')) with_cosmic = sum(1 for v in variants if v.get('cosmic_id')) with_omim = sum(1 for v in variants if v.get('omim_diseases')) with_pharmgkb = sum(1 for v in variants if v.get('pharmgkb_drugs')) # Pathogenicity distribution pathogenic = sum(1 for v in variants if v.get('clinvar_significance') in ['Pathogenic', 'Likely_pathogenic']) benign = sum(1 for v in variants if v.get('clinvar_significance') in ['Benign', 'Likely_benign']) # Confidence statistics confidence_stats = self._calculate_confidence_statistics(variants) return { 'total_variants': total_variants, 'functional_annotations': { 'with_gene_symbol': with_gene, 'percentage': round(with_gene / total_variants * 100, 1) if total_variants > 0 else 0 }, 'database_coverage': { 'clinvar': {'count': with_clinvar, 'percentage': round(with_clinvar / total_variants * 100, 1)}, 'gnomad': {'count': with_gnomad, 'percentage': round(with_gnomad / total_variants * 100, 1)}, 'dbsnp': {'count': with_dbsnp, 'percentage': round(with_dbsnp / total_variants * 100, 1)}, 'cosmic': {'count': with_cosmic, 'percentage': round(with_cosmic / total_variants * 100, 1)}, 'omim': {'count': with_omim, 'percentage': round(with_omim / total_variants * 100, 1)}, 'pharmgkb': {'count': with_pharmgkb, 'percentage': round(with_pharmgkb / total_variants * 100, 1)} }, 'clinical_significance': { 'pathogenic': pathogenic, 'benign': benign, 'unknown': total_variants - pathogenic - benign }, 'confidence_statistics': confidence_stats }
[docs] def main(): """Command line interface for enhanced variant annotator""" import click @click.command() @click.argument('input_file', type=click.Path(exists=True)) @click.option('--output', '-o', type=click.Path(), help='Output file path') @click.option('--format', '-f', type=click.Choice(['vcf', 'tsv', 'json']), default='vcf', help='Output format') @click.option('--databases', '-d', multiple=True, help='Annotation databases to use') @click.option('--genome', '-g', type=click.Choice(['hg19', 'hg38']), default='hg38', help='Reference genome version') @click.option('--verbose', '-v', is_flag=True, help='Enable verbose output') @click.option('--use_real_db', '-r', is_flag=True, help='Use real databases instead of mock data') @click.option('--cache_dir', '-c', type=click.Path(), help='Cache directory for real database results') @click.option('--parallel', is_flag=True, help='Enable parallel processing') @click.option('--max_workers', type=int, default=4, help='Maximum parallel workers') def cli(input_file, output, format, databases, genome, verbose, use_real_db, cache_dir, parallel, max_workers): """Annotate variants with enhanced genomic information.""" if not output: output = Path(input_file).stem + f"_annotated.{format}" try: annotator = VariantAnnotatorTool( genome=genome, databases=list(databases) if databases else None, verbose=verbose, use_real_db=use_real_db, cache_dir=cache_dir, use_parallel=parallel, max_workers=max_workers ) result = annotator.annotate_file(input_file, output, format) click.echo(f"✅ Enhanced annotation complete!") click.echo(f"📊 Variants processed: {result['variants_processed']}") click.echo(f"📁 Output saved to: {output}") # Show confidence statistics if 'confidence_stats' in result: stats = result['confidence_stats'] click.echo(f"📈 Confidence: avg={stats['average']:.3f}, high_conf={stats['high_confidence_count']}") except Exception as e: click.echo(f"❌ Error: {str(e)}", err=True) sys.exit(1) cli()
if __name__ == '__main__': main()