# -*- coding: utf-8 -*-
import numpy as np
import functools
from collections import namedtuple
from wub.util import seq as seq_util
uniform_probs = [0.25, 0.25, 0.25, 0.25]
strand_directions = ['+', '-']
MutatedSeq = namedtuple(
'MutatedSeq', 'seq real_qual real_subst real_del real_ins cigar')
cigar_operations = {'match': 'M', 'substitution': 'M', 'insertion': 'I', 'deletion': 'D'}
[docs]def sample_direction(forward_prob):
return np.random.choice(strand_directions, p=[forward_prob, 1 - forward_prob])
[docs]def random_base(probs=uniform_probs):
"""Generate a random DNA base.
:param probs: Probabilities of sampling a base, in the ACGT order.
:returns: A sampled base.
:rtype: str
"""
return np.random.choice(seq_util.bases, p=probs)
[docs]def random_base_except(excluded, probs=uniform_probs):
"""Generate a random base according to the specified probabilities with the exclusion of the specified base.
:param excluded: Exclude this base from sampling.
:param probs: Base sampling probabilities in the ACGT order.
:returns: A sampled base.
:rtype: str
"""
if len(probs) != len(seq_util.bases):
raise ValueError('Probability vector has wrong length!')
# Filter out excluded base:
bp_dict = dict((x, y)
for x, y in zip(seq_util.bases, probs) if x != excluded)
filtered_bases = list(bp_dict.keys())
norm_probs = np.array(list(bp_dict.values()), dtype=float)
# Re-normalise probabilities:
norm_probs = norm_probs / np.sum(norm_probs)
return np.random.choice(filtered_bases, p=norm_probs)
[docs]def simulate_sequence(length, probs=uniform_probs):
"""Simulate sequence of specified length and base composition.
:param length: Length of simulated sequence.
:param probs: Base composition vector in the ACGT order.
:returns: Simulated sequence.
:rtype: str
"""
return ''.join(np.random.choice(seq_util.bases, size=length, p=probs))
[docs]def sample_error_type(error_weights):
"""Sample error type from error weights dictionary.
:param error_weights: A dcitionary with (type, probability) pairs.
:returns: Error type
:rtype: str
"""
return np.random.choice(list(error_weights.keys()), p=list(error_weights.values()))
[docs]def cigar_list_to_string(cigar_list):
"""Sample error type from error weights dictionary.
:param error_weights: A dcitionary with (type, probability) pairs.
:returns: Error type
:rtype: str
"""
tmp = map(lambda x: str(x[0]) + str(x[1]), cigar_list)
return ''.join(tmp)
[docs]def compress_raw_cigar_list(raw_cigar):
"""Sample error type from error weights dictionary.
:param error_weights: A dcitionary with (type, probability) pairs.
:returns: Error type
:rtype: str
"""
raw_cigar[0] = [raw_cigar[0]]
def cigar_op_compose(a, b):
x = a.pop()
if x[1] == b[1]:
a.append((x[0] + b[0], x[1]))
else:
a.extend([x, b])
return a
cigar = functools.reduce(cigar_op_compose, raw_cigar)
return cigar
[docs]def simulate_sequencing_errors(sequence, error_rate, error_weights):
"""Simulate substitutions, deletions and insertions.
:param sequence: Input sequence.
:param error_rate: Total error rate.
:param error_weights: A dictionary with error types as keys and probabilities as values.
The possible error types are: substitution, deletion, insertion.
:returns: A named tuple with elements: mutated sequence, realised quality, number of realised substitutions,
number of realised deletions, number of realised insertions, cigar string.
:rtype: namedtuple
"""
if len(sequence) == 0:
raise Exception('Cannot simulate sequencing errors on empty sequence!')
new_bases = []
realised_substitutions = 0
realised_deletions = 0
realised_insertions = 0
raw_cigar_list = []
for position, base in enumerate(sequence):
if np.random.uniform() < error_rate:
error_type = sample_error_type(error_weights)
if error_type == 'substitution':
new_base = random_base_except(base)
realised_substitutions += 1
raw_cigar_list.append((1, cigar_operations[error_type]))
elif error_type == 'deletion':
new_base = ''
realised_deletions += 1
raw_cigar_list.append((1, cigar_operations[error_type]))
elif error_type == 'insertion':
new_base = base + random_base()
realised_insertions += 1
raw_cigar_list.append((1, cigar_operations['match']))
raw_cigar_list.append((1, cigar_operations[error_type]))
else:
raise Exception("Unhandled error type: {}".format(error_type))
else:
raw_cigar_list.append((1, cigar_operations['match']))
new_base = base
new_bases.append(new_base)
new_sequence = ''.join(new_bases)
cigar = cigar_list_to_string(compress_raw_cigar_list(raw_cigar_list))
realised_events = realised_substitutions + \
realised_deletions + realised_insertions
realised_quality = seq_util.prob_to_phred(
round(float(realised_events) / float(len(sequence)), 3))
mutated_record = MutatedSeq(
new_sequence, realised_quality, realised_substitutions, realised_deletions, realised_insertions, cigar)
return mutated_record
[docs]def add_errors(seq, nr_errors, error_type):
"""Introduce a specified number of errors in the target sequence at random positions.
:param seq: Input DNA sequence.
:param nr_errors: Number of mismatches to introduce.
:returns: Mutated sequence.
:rtype: str
"""
seq = list(seq)
positions = np.random.choice(np.arange(len(seq)), size=nr_errors, replace=False)
if error_type == 'substitution':
for pos in positions:
seq[pos] = random_base_except(seq[pos])
elif error_type == 'deletion':
for pos in positions:
seq[pos] = ''
elif error_type == 'insertion':
for pos in positions:
seq[pos] = seq[pos] + random_base()
else:
raise Exception('Invalid error type')
return ''.join(seq)