Source code for wub.bam.compare

# -*- coding: utf-8 -*-
"""Compares alignments in two BAM files."""

from six.moves import zip, zip_longest
import tqdm
from itertools import chain
from collections import OrderedDict
from wub.bam import common as bam_common


[docs]def is_coarse_match(aln_diff, tolerance): """Determine if start and end postions of two alignments are within the specified tolerance levels. :param aln_diff: Alignment diff structure as returned by compare_alignments. :returns: True or False :rtype: bool """ if abs(aln_diff['start_pos'][1] - aln_diff['start_pos'][0]) > tolerance: return False if abs(aln_diff['end_pos'][1] - aln_diff['end_pos'][0]) > tolerance: return False return True
[docs]def bam_compare(aln_one, aln_two, coarse_tolerance=50, strict_flags=False, in_format='BAM', verbose=False): """Count reads mapping to references in a BAM file. :param alignment_file: BAM file. :param min_aln_qual: Minimum mapping quality. :param verbose: Show progress bar. :returns: Dictionary with read counts per reference. :rtype: dict """ aln_iter_one = bam_common.pysam_open(aln_one, in_format) aln_iter_two = bam_common.pysam_open(aln_two, in_format) total = None if in_format == "BAM": total_one = aln_iter_one.mapped + aln_iter_two.unmapped total_two = aln_iter_two.mapped + aln_iter_two.unmapped if total_one != total_two: raise Exception( "The two input files ({} {}) have a different number of records!".format(aln_one, aln_two)) total = total_one # Comparison summary structure: stats = OrderedDict([ ('BamFiles', [aln_one, aln_two]), ('TotalQueries', 0), ('DirectionMismatch', 0), ('RefMismatch', 0), ('StrictFlagMismatch', 0), ('SeqMismatch', 0), ('CoarseMatches', 0), ('CommonAlignedBases', 0), ('CommonMatchingBases', 0), ('PerQueryBaseSim', []), ('PerQueryBaseSimClipped', []), (aln_one, {'HardClippedBases': 0, 'SoftClippedBases': 0, 'AlignedBases': 0, 'UnalignedQueries': 0, 'AlignedQueries': 0}), (aln_two, {'HardClippedBases': 0, 'SoftClippedBases': 0, 'AlignedBases': 0, 'UnalignedQueries': 0, 'AlignedQueries': 0}), ('AlignedSimilarity', 0.0), ]) records_iter = zip( aln_iter_one.fetch(until_eof=True), aln_iter_two.fetch(until_eof=True)) if verbose and in_format == "BAM": records_iter = tqdm.tqdm(records_iter, total=total) for segments in records_iter: aln_diff = compare_alignments(segments[0], segments[1], strict_flags) stats['TotalQueries'] += 1 # Register hard and soft clipped bases: stats[aln_one]['HardClippedBases'] += aln_diff['hard_clipped'][0] stats[aln_two]['HardClippedBases'] += aln_diff['hard_clipped'][1] stats[aln_one]['SoftClippedBases'] += aln_diff['soft_clipped'][0] stats[aln_two]['SoftClippedBases'] += aln_diff['soft_clipped'][1] # Both reads are aligned: if aln_diff['mapped'] == (True, True): stats[aln_one]['AlignedQueries'] += 1 stats[aln_two]['AlignedQueries'] += 1 # Reference mismatch: if aln_diff['ref_match'] is False: stats['RefMismatch'] = + 1 continue # Orientation mismatch: if aln_diff['dir_match'] is False: stats['DirectionMismatch'] += 1 continue # Flag mismatch: if aln_diff['flag_match'] is False: stats['StrictFlagMismatch'] += 1 continue # Sequence mismatch: if aln_diff['seq_match'] is False: stats['SeqMismatch'] += 1 stats['CommonAlignedBases'] += aln_diff['bases'] stats['CommonMatchingBases'] += aln_diff['cons_score'] stats['PerQueryBaseSim'].append( aln_diff['cons_score'] / float(aln_diff['bases'])) stats['PerQueryBaseSimClipped'].append(float( aln_diff['cons_score']) / min(segments[0].infer_query_length(), segments[1].infer_query_length())) if is_coarse_match(aln_diff, coarse_tolerance): stats['CoarseMatches'] += 1 stats[aln_one]['AlignedBases'] += aln_diff['bases'] stats[aln_two]['AlignedBases'] += aln_diff['bases'] # Read from first BAM is aligned: elif aln_diff['mapped'] == (True, False): stats[aln_one]['AlignedQueries'] += 1 stats[aln_one]['AlignedBases'] += aln_diff['bases_one'] stats[aln_two]['UnalignedQueries'] += 1 # Read from second BAM is aligned: elif aln_diff['mapped'] == (False, True): stats[aln_two]['AlignedQueries'] += 1 stats[aln_two]['AlignedBases'] += aln_diff['bases_two'] stats[aln_one]['UnalignedQueries'] += 1 # Both unaligned: elif aln_diff['mapped'] == (False, False): stats[aln_one]['UnalignedQueries'] += 1 stats[aln_two]['UnalignedQueries'] += 1 if stats['CommonAlignedBases'] > 0: stats['AlignedSimilarity'] = stats['CommonMatchingBases'] / \ float(stats['CommonAlignedBases']) else: stats['AlignedSimilarity'] = 0.0 return stats
[docs]def aligned_pairs_to_matches(aligned_pairs, offset): """Convert aligned pairs into a sequence of reference positions. :param aligned_pairs: Iterator of aligned pairs. :param offset: Offset at the beggining of the sequences. :returns: Iterator of reference positions aligned to the sequences positions. :rtype: generator """ ref_pos_iter = (pair[1] for pair in aligned_pairs if pair[0] is not None) return chain([False] * offset, ref_pos_iter)
[docs]def calc_consistency_score(segment_one, segment_two, offset_one, offset_two): """Calculate the number of bases aligned to the same reference bases in two alignments. :param segment_one: Pysam aligned segments. :param segment_two: Pysam aligned segments. :param offset_one: Hard clipping offset for the first alignment. :param offset_two: Hard clipping offset for the second alignment. :retruns: Number of matching base alignments. :rtype: int """ matches_one = aligned_pairs_to_matches( segment_one.get_aligned_pairs(), offset_one) matches_two = aligned_pairs_to_matches( segment_two.get_aligned_pairs(), offset_two) score = 0 for matches in zip_longest(matches_one, matches_two, fillvalue=False): if matches[0] == matches[1]: score += 1 return score
[docs]def get_hard_clip_offset(aln): """Get hard clipping offset from alignment. :param aln: Pysam aligned segment. :returns: Hard clipping offset. :rtype: int """ op = aln.cigartuples[0] if op[0] == 5: return op[1] return 0
[docs]def count_clipped(aln, target_op): """Count hard clipped bases in aligned segment. :param aln: Pysam aligned segement. :param target_op: CIGAR operation. :returns: Number of hard clipped bases in segment. :rtype: int """ if aln.is_unmapped: return 0 return sum(op[1] for op in (aln.cigartuples[0], aln.cigartuples[-1]) if op[0] == target_op)
[docs]def compare_alignments(segment_one, segment_two, strict_flags=False): """Count reads mapping to references in a BAM file. :param alignment_file: BAM file. :param min_aln_qual: Minimum mapping quality. :returns: Dictionary with read counts per reference. :rtype: dict """ # FIXME: comparison logic does not support multiple alignments, split # alignments # Check if query names match: if segment_one.query_name != segment_two.query_name: raise Exception("Mismatched query names: {} {} Note that these should match exactly!".format( segment_one.query_name, segment_two.query_name)) aln_diff = OrderedDict([ ('mapped', None), ('ref_match', None), ('dir_match', None), ('flag_match', None), ('seq_match', None), ('bases', None), ('start_pos', None), ('end_pos', None), ('hard_clipped', None), ('soft_clipped', None), ('cons_score', None), ]) aln_diff['mapped'] = ( not segment_one.is_unmapped, not segment_two.is_unmapped) # Count hard clipped bases: aln_diff['hard_clipped'] = ( count_clipped(segment_one, 5), count_clipped(segment_two, 5)) # Count soft clipped bases: aln_diff['soft_clipped'] = ( count_clipped(segment_one, 4), count_clipped(segment_two, 4)) # One or both unmapped: if aln_diff['mapped'] == (False, False): return aln_diff elif aln_diff['mapped'] == (True, False): aln_diff['bases_one'] = segment_one.infer_query_length() return aln_diff elif aln_diff['mapped'] == (False, True): aln_diff['bases_two'] = segment_two.infer_query_length() return aln_diff # Mismatch in reference name: if segment_one.reference_name != segment_two.reference_name: aln_diff['ref_match'] = False return aln_diff else: aln_diff['ref_match'] = True # Mismatch in orientation: if segment_one.is_reverse != segment_two.is_reverse: aln_diff['dir_match'] = False return aln_diff else: aln_diff['dir_match'] = True # Perform strict flag checking: if strict_flags: if segment_one.flag != segment_two.flag: aln_diff['flag_match'] = False return aln_diff else: aln_diff['flag_match'] = True # Check if read sequences differ: Now should work with hard clipping. if segment_one.query_sequence != segment_two.query_sequence: aln_diff['seq_match'] = False else: aln_diff['seq_match'] = True # Register number of bases: aln_diff['bases'] = max( segment_one.infer_query_length(), segment_two.infer_query_length()) # Register reference positions: aln_diff['start_pos'] = ( segment_one.reference_start, segment_two.reference_start) aln_diff['end_pos'] = ( segment_one.reference_end, segment_two.reference_end) # Figure out hard clipping offsets: offset_one = get_hard_clip_offset(segment_one) offset_two = get_hard_clip_offset(segment_two) # Register number of bases aligned the same way: aln_diff['cons_score'] = calc_consistency_score( segment_one, segment_two, offset_one, offset_two) return aln_diff