#!/usr/bin/env python3
"""
Advanced Filtering System for VarAnnote v1.0.0
Provides comprehensive variant filtering capabilities including:
- Quality score filtering
- Population frequency thresholds
- Clinical significance filters
- Gene-based filtering
- Consequence type filtering
- Custom filter expressions
"""
import re
import operator
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass
from enum import Enum
import logging
from .logger import get_logger
[docs]
class FilterOperator(Enum):
"""Filter operators"""
EQUALS = "=="
NOT_EQUALS = "!="
GREATER_THAN = ">"
GREATER_EQUAL = ">="
LESS_THAN = "<"
LESS_EQUAL = "<="
CONTAINS = "contains"
NOT_CONTAINS = "not_contains"
IN = "in"
NOT_IN = "not_in"
REGEX = "regex"
IS_NULL = "is_null"
IS_NOT_NULL = "is_not_null"
[docs]
class ClinicalSignificance(Enum):
"""Clinical significance categories"""
PATHOGENIC = "Pathogenic"
LIKELY_PATHOGENIC = "Likely_pathogenic"
UNCERTAIN_SIGNIFICANCE = "Uncertain_significance"
LIKELY_BENIGN = "Likely_benign"
BENIGN = "Benign"
CONFLICTING = "Conflicting"
NOT_PROVIDED = "Not_provided"
[docs]
@dataclass
class FilterRule:
"""Individual filter rule"""
field: str
operator: FilterOperator
value: Any
description: Optional[str] = None
def __post_init__(self):
if isinstance(self.operator, str):
self.operator = FilterOperator(self.operator)
[docs]
@dataclass
class FilterSet:
"""Collection of filter rules with logic"""
name: str
rules: List[FilterRule]
logic: str = "AND" # AND, OR
description: Optional[str] = None
enabled: bool = True
[docs]
class VariantFilter:
"""
Advanced variant filtering system
Features:
- Multiple filter criteria
- Logical combinations (AND/OR)
- Custom filter expressions
- Predefined filter sets
- Performance optimization
"""
[docs]
def __init__(self):
"""Initialize variant filter"""
self.logger = get_logger("variant_filter")
# Operator mapping
self.operators = {
FilterOperator.EQUALS: operator.eq,
FilterOperator.NOT_EQUALS: operator.ne,
FilterOperator.GREATER_THAN: operator.gt,
FilterOperator.GREATER_EQUAL: operator.ge,
FilterOperator.LESS_THAN: operator.lt,
FilterOperator.LESS_EQUAL: operator.le,
FilterOperator.CONTAINS: self._contains,
FilterOperator.NOT_CONTAINS: self._not_contains,
FilterOperator.IN: self._in,
FilterOperator.NOT_IN: self._not_in,
FilterOperator.REGEX: self._regex_match,
FilterOperator.IS_NULL: self._is_null,
FilterOperator.IS_NOT_NULL: self._is_not_null
}
# Predefined filter sets
self.predefined_filters = self._create_predefined_filters()
self.logger.info("Variant filter initialized")
[docs]
def apply_filter(self, variants: List[Dict[str, Any]],
filter_set: FilterSet) -> List[Dict[str, Any]]:
"""
Apply filter set to variants
Args:
variants: List of variant dictionaries
filter_set: Filter set to apply
Returns:
Filtered list of variants
"""
if not filter_set.enabled or not filter_set.rules:
return variants
filtered_variants = []
for variant in variants:
if self._evaluate_filter_set(variant, filter_set):
filtered_variants.append(variant)
self.logger.info(f"Filtered {len(variants)} variants to {len(filtered_variants)} "
f"using filter set '{filter_set.name}'")
return filtered_variants
[docs]
def apply_multiple_filters(self, variants: List[Dict[str, Any]],
filter_sets: List[FilterSet],
combine_logic: str = "AND") -> List[Dict[str, Any]]:
"""
Apply multiple filter sets
Args:
variants: List of variant dictionaries
filter_sets: List of filter sets to apply
combine_logic: How to combine filter sets (AND/OR)
Returns:
Filtered list of variants
"""
if not filter_sets:
return variants
# Filter enabled filter sets
enabled_filters = [f for f in filter_sets if f.enabled]
if not enabled_filters:
return variants
filtered_variants = []
for variant in variants:
if combine_logic.upper() == "AND":
# All filter sets must pass
if all(self._evaluate_filter_set(variant, fs) for fs in enabled_filters):
filtered_variants.append(variant)
else: # OR
# At least one filter set must pass
if any(self._evaluate_filter_set(variant, fs) for fs in enabled_filters):
filtered_variants.append(variant)
self.logger.info(f"Applied {len(enabled_filters)} filter sets with {combine_logic} logic: "
f"{len(variants)} -> {len(filtered_variants)} variants")
return filtered_variants
[docs]
def create_quality_filter(self, min_quality: float = 0.0,
max_population_freq: float = 1.0,
include_uncertain: bool = True) -> FilterSet:
"""Create quality-based filter set"""
rules = []
# Quality score filter
if min_quality > 0:
rules.append(FilterRule(
field="quality_score",
operator=FilterOperator.GREATER_EQUAL,
value=min_quality,
description=f"Quality score >= {min_quality}"
))
# Population frequency filter
if max_population_freq < 1.0:
rules.append(FilterRule(
field="population_frequency",
operator=FilterOperator.LESS_EQUAL,
value=max_population_freq,
description=f"Population frequency <= {max_population_freq}"
))
# Clinical significance filter
if not include_uncertain:
rules.append(FilterRule(
field="clinical_significance",
operator=FilterOperator.NOT_IN,
value=["Uncertain_significance", "Conflicting", "Not_provided"],
description="Exclude uncertain clinical significance"
))
return FilterSet(
name="quality_filter",
rules=rules,
logic="AND",
description="Quality-based variant filtering"
)
[docs]
def create_clinical_filter(self,
significance_levels: List[str] = None,
exclude_benign: bool = False,
require_review: bool = False) -> FilterSet:
"""Create clinical significance filter set"""
rules = []
if significance_levels:
rules.append(FilterRule(
field="clinical_significance",
operator=FilterOperator.IN,
value=significance_levels,
description=f"Clinical significance in {significance_levels}"
))
if exclude_benign:
rules.append(FilterRule(
field="clinical_significance",
operator=FilterOperator.NOT_IN,
value=["Benign", "Likely_benign"],
description="Exclude benign variants"
))
if require_review:
rules.append(FilterRule(
field="review_status",
operator=FilterOperator.NOT_EQUALS,
value="no_assertion",
description="Require reviewed variants"
))
return FilterSet(
name="clinical_filter",
rules=rules,
logic="AND",
description="Clinical significance filtering"
)
[docs]
def create_gene_filter(self,
gene_list: List[str] = None,
gene_types: List[str] = None,
exclude_intergenic: bool = True) -> FilterSet:
"""Create gene-based filter set"""
rules = []
if gene_list:
rules.append(FilterRule(
field="gene_symbol",
operator=FilterOperator.IN,
value=gene_list,
description=f"Genes in {gene_list}"
))
if gene_types:
rules.append(FilterRule(
field="gene_type",
operator=FilterOperator.IN,
value=gene_types,
description=f"Gene types in {gene_types}"
))
if exclude_intergenic:
rules.append(FilterRule(
field="consequence",
operator=FilterOperator.NOT_CONTAINS,
value="intergenic",
description="Exclude intergenic variants"
))
return FilterSet(
name="gene_filter",
rules=rules,
logic="AND",
description="Gene-based filtering"
)
[docs]
def create_consequence_filter(self,
consequence_types: List[str] = None,
severity_threshold: str = "moderate") -> FilterSet:
"""Create consequence-based filter set"""
rules = []
# Severity mapping
severity_map = {
"high": ["stop_gained", "frameshift_variant", "start_lost", "stop_lost"],
"moderate": ["missense_variant", "inframe_deletion", "inframe_insertion"],
"low": ["synonymous_variant", "stop_retained_variant"],
"modifier": ["intron_variant", "upstream_variant", "downstream_variant"]
}
if consequence_types:
rules.append(FilterRule(
field="consequence",
operator=FilterOperator.IN,
value=consequence_types,
description=f"Consequences in {consequence_types}"
))
if severity_threshold in severity_map:
# Include consequences at or above threshold
allowed_consequences = []
severity_order = ["high", "moderate", "low", "modifier"]
threshold_index = severity_order.index(severity_threshold)
for i in range(threshold_index + 1):
allowed_consequences.extend(severity_map[severity_order[i]])
rules.append(FilterRule(
field="consequence",
operator=FilterOperator.IN,
value=allowed_consequences,
description=f"Consequence severity >= {severity_threshold}"
))
return FilterSet(
name="consequence_filter",
rules=rules,
logic="AND",
description="Consequence-based filtering"
)
[docs]
def create_custom_filter(self, expression: str) -> FilterSet:
"""
Create custom filter from expression
Args:
expression: Filter expression (e.g., "quality_score > 0.8 AND population_frequency < 0.01")
Returns:
FilterSet object
"""
# Parse expression into rules
rules = self._parse_filter_expression(expression)
return FilterSet(
name="custom_filter",
rules=rules,
logic="AND",
description=f"Custom filter: {expression}"
)
[docs]
def get_predefined_filter(self, name: str) -> Optional[FilterSet]:
"""Get predefined filter set by name"""
return self.predefined_filters.get(name)
[docs]
def list_predefined_filters(self) -> List[str]:
"""List available predefined filter names"""
return list(self.predefined_filters.keys())
[docs]
def get_filter_statistics(self, variants: List[Dict[str, Any]],
filter_set: FilterSet) -> Dict[str, Any]:
"""Get statistics about filter application"""
total_variants = len(variants)
filtered_variants = self.apply_filter(variants, filter_set)
filtered_count = len(filtered_variants)
return {
"filter_name": filter_set.name,
"total_variants": total_variants,
"filtered_variants": filtered_count,
"filtered_percentage": (filtered_count / total_variants * 100) if total_variants > 0 else 0,
"removed_variants": total_variants - filtered_count,
"removed_percentage": ((total_variants - filtered_count) / total_variants * 100) if total_variants > 0 else 0
}
def _evaluate_filter_set(self, variant: Dict[str, Any], filter_set: FilterSet) -> bool:
"""Evaluate filter set against variant"""
if not filter_set.rules:
return True
results = []
for rule in filter_set.rules:
result = self._evaluate_rule(variant, rule)
results.append(result)
# Apply logic
if filter_set.logic.upper() == "AND":
return all(results)
else: # OR
return any(results)
def _evaluate_rule(self, variant: Dict[str, Any], rule: FilterRule) -> bool:
"""Evaluate single filter rule"""
try:
# Get field value
field_value = self._get_field_value(variant, rule.field)
# Handle None values for comparison operators
if field_value is None and rule.operator in [
FilterOperator.GREATER_THAN, FilterOperator.GREATER_EQUAL,
FilterOperator.LESS_THAN, FilterOperator.LESS_EQUAL
]:
return False # None values fail numeric comparisons
# Apply operator
operator_func = self.operators.get(rule.operator)
if not operator_func:
self.logger.warning(f"Unknown operator: {rule.operator}")
return True
return operator_func(field_value, rule.value)
except Exception as e:
self.logger.warning(f"Error evaluating rule {rule.field} {rule.operator} {rule.value}: {e}")
return False # Default to exclude variant on error for stricter filtering
def _get_field_value(self, variant: Dict[str, Any], field: str) -> Any:
"""Get field value from variant, supporting nested fields"""
if '.' in field:
# Nested field access
parts = field.split('.')
value = variant
for part in parts:
if isinstance(value, dict) and part in value:
value = value[part]
else:
return None
return value
else:
return variant.get(field)
def _contains(self, field_value: Any, filter_value: Any) -> bool:
"""Contains operator"""
if field_value is None:
return False
return str(filter_value).lower() in str(field_value).lower()
def _not_contains(self, field_value: Any, filter_value: Any) -> bool:
"""Not contains operator"""
return not self._contains(field_value, filter_value)
def _in(self, field_value: Any, filter_value: List[Any]) -> bool:
"""In operator"""
if field_value is None:
return False
return field_value in filter_value
def _not_in(self, field_value: Any, filter_value: List[Any]) -> bool:
"""Not in operator"""
return not self._in(field_value, filter_value)
def _regex_match(self, field_value: Any, filter_value: str) -> bool:
"""Regex match operator"""
if field_value is None:
return False
try:
return bool(re.search(filter_value, str(field_value)))
except re.error as e:
self.logger.warning(f"Invalid regex pattern '{filter_value}': {e}")
return False
def _is_null(self, field_value: Any, filter_value: Any) -> bool:
"""Is null operator"""
return field_value is None or field_value == ""
def _is_not_null(self, field_value: Any, filter_value: Any) -> bool:
"""Is not null operator"""
return not self._is_null(field_value, filter_value)
def _parse_filter_expression(self, expression: str) -> List[FilterRule]:
"""Parse filter expression into rules (simplified implementation)"""
# This is a simplified parser - in production, you'd want a more robust one
rules = []
# Split by AND/OR (simplified)
parts = re.split(r'\s+(AND|OR)\s+', expression, flags=re.IGNORECASE)
for part in parts:
if part.upper() in ['AND', 'OR']:
continue
# Parse individual condition
rule = self._parse_condition(part.strip())
if rule:
rules.append(rule)
return rules
def _parse_condition(self, condition: str) -> Optional[FilterRule]:
"""Parse individual condition into FilterRule"""
# Match patterns like "field operator value"
patterns = [
(r'(\w+)\s*(>=|<=|>|<|==|!=)\s*([^\s]+)', 'comparison'),
(r'(\w+)\s+contains\s+(.+)', 'contains'),
(r'(\w+)\s+in\s+\[([^\]]+)\]', 'in')
]
for pattern, op_type in patterns:
match = re.match(pattern, condition, re.IGNORECASE)
if match:
field = match.group(1)
if op_type == 'comparison':
operator_str = match.group(2)
value_str = match.group(3)
# Convert value
try:
value = float(value_str)
except ValueError:
value = value_str.strip('"\'')
return FilterRule(
field=field,
operator=FilterOperator(operator_str),
value=value
)
elif op_type == 'contains':
value = match.group(2).strip('"\'')
return FilterRule(
field=field,
operator=FilterOperator.CONTAINS,
value=value
)
elif op_type == 'in':
values_str = match.group(2)
values = [v.strip().strip('"\'') for v in values_str.split(',')]
return FilterRule(
field=field,
operator=FilterOperator.IN,
value=values
)
return None
def _create_predefined_filters(self) -> Dict[str, FilterSet]:
"""Create predefined filter sets"""
filters = {}
# High confidence filter
filters["high_confidence"] = FilterSet(
name="high_confidence",
rules=[
FilterRule("quality_score", FilterOperator.GREATER_EQUAL, 0.8),
FilterRule("clinical_significance", FilterOperator.IN,
["Pathogenic", "Likely_pathogenic"]),
FilterRule("review_status", FilterOperator.NOT_EQUALS, "no_assertion")
],
description="High confidence pathogenic variants"
)
# Rare variants filter
filters["rare_variants"] = FilterSet(
name="rare_variants",
rules=[
FilterRule("population_frequency", FilterOperator.LESS_EQUAL, 0.01),
FilterRule("clinical_significance", FilterOperator.NOT_IN,
["Benign", "Likely_benign"])
],
description="Rare variants (MAF <= 1%)"
)
# Coding variants filter
filters["coding_variants"] = FilterSet(
name="coding_variants",
rules=[
FilterRule("consequence", FilterOperator.IN,
["missense_variant", "stop_gained", "frameshift_variant",
"start_lost", "stop_lost", "inframe_deletion", "inframe_insertion"]),
FilterRule("gene_symbol", FilterOperator.IS_NOT_NULL, None)
],
description="Protein-coding variants"
)
# Pharmacogenomics filter
filters["pharmacogenomics"] = FilterSet(
name="pharmacogenomics",
rules=[
FilterRule("drug_interactions", FilterOperator.IS_NOT_NULL, None),
FilterRule("pharmgkb_level", FilterOperator.IN, ["1A", "1B", "2A", "2B"])
],
description="Pharmacogenomically relevant variants"
)
return filters
# Global filter instance
_variant_filter: Optional[VariantFilter] = None
[docs]
def get_variant_filter() -> VariantFilter:
"""Get global variant filter instance"""
global _variant_filter
if _variant_filter is None:
_variant_filter = VariantFilter()
return _variant_filter
[docs]
def apply_quality_filter(variants: List[Dict[str, Any]],
min_quality: float = 0.0,
max_population_freq: float = 1.0) -> List[Dict[str, Any]]:
"""Apply quality filter to variants"""
filter_obj = get_variant_filter()
quality_filter = filter_obj.create_quality_filter(min_quality, max_population_freq)
return filter_obj.apply_filter(variants, quality_filter)
[docs]
def apply_clinical_filter(variants: List[Dict[str, Any]],
significance_levels: List[str] = None) -> List[Dict[str, Any]]:
"""Apply clinical significance filter to variants"""
filter_obj = get_variant_filter()
clinical_filter = filter_obj.create_clinical_filter(significance_levels)
return filter_obj.apply_filter(variants, clinical_filter)
[docs]
def apply_predefined_filter(variants: List[Dict[str, Any]],
filter_name: str) -> List[Dict[str, Any]]:
"""Apply predefined filter to variants"""
filter_obj = get_variant_filter()
predefined_filter = filter_obj.get_predefined_filter(filter_name)
if predefined_filter:
return filter_obj.apply_filter(variants, predefined_filter)
else:
raise ValueError(f"Unknown predefined filter: {filter_name}")