from collections import defaultdict
import numpy as np
from . import config as ttconf, TreeTimeError, MissingDataError
from .seq_utils import alphabets, profile_maps, alphabet_synonyms
def avg_transition(W,pi, gap_index=None):
if gap_index is None:
return np.einsum('i,ij,j', pi, W, pi)
else:
return (np.einsum('i,ij,j', pi, W, pi) - np.sum(pi*W[:,gap_index])*pi[gap_index])/(1-pi[gap_index])
[docs]class GTR(object):
"""
Defines General-Time-Reversible model of character evolution.
"""
[docs] def __init__(self, alphabet='nuc', prof_map=None, logger=None):
"""
Initialize empty evolutionary model.
Parameters
----------
alphabet : str, numpy.array
Alphabet of the sequence. If a string is passed, it is understood as
an alphabet name. In this case, the alphabet and its profile map are pulled
from :py:obj:`treetime.seq_utils`.
If a numpy array of characters is passed, a new alphabet is constructed,
and the default profile map is atached to it.
prof_map : dict
Dictionary linking characters in the sequence to the likelihood
of observing characters in the alphabet. This is used to
implement ambiguous characters like 'N'=[1,1,1,1] which are
equally likely to be any of the 4 nucleotides. Standard profile_maps
are defined in file seq_utils.py.
logger : callable
Custom logging function that should take arguments (msg, level, warn=False),
where msg is a string and level an integer to be compared against verbose.
"""
self.debug=False
self.is_site_specific=False
if isinstance(alphabet, str):
if alphabet not in alphabet_synonyms:
raise AttributeError("Unknown alphabet type specified")
else:
tmp_alphabet = alphabet_synonyms[alphabet]
self.alphabet = alphabets[tmp_alphabet]
self.profile_map = profile_maps[tmp_alphabet]
else:
# not a predefined alphabet
self.alphabet = np.array(alphabet)
if prof_map is None: # generate trivial unambiguous profile map is none is given
self.profile_map = {s:x for s,x in zip(self.alphabet, np.eye(len(self.alphabet)))}
else:
self.profile_map = {x if type(x) is str else x:k for x,k in prof_map.items()}
self.state_index={s:si for si,s in enumerate(self.alphabet)}
self.state_index.update({s:si for si,s in enumerate(self.alphabet)})
if logger is None:
def logger_default(*args,**kwargs):
"""standard logging function if none provided"""
if self.debug:
print(*args)
self.logger = logger_default
else:
self.logger = logger
self.ambiguous = None
self.gap_index = None
self.n_states = len(self.alphabet)
self.assign_gap_and_ambiguous()
# init all matrices with dummy values
self.logger("GTR: init with dummy values!", 3)
self.v = None # right eigenvectors
self.v_inv = None # left eigenvectors
self.eigenvals = None # eigenvalues
self.assign_rates()
def assign_gap_and_ambiguous(self):
n_states = len(self.alphabet)
self.logger("GTR: with alphabet: "+str([x for x in self.alphabet]),1)
# determine if a character exists that corresponds to no info, i.e. all one profile
if any([x.sum()==n_states for x in self.profile_map.values()]):
amb_states = [c for c,x in self.profile_map.items() if x.sum()==n_states]
self.ambiguous = 'N' if 'N' in amb_states else amb_states[0]
self.logger("GTR: ambiguous character: "+self.ambiguous,2)
else:
self.ambiguous=None
# check for a gap symbol
try:
self.gap_index = self.state_index['-']
except:
self.logger("GTR: no gap symbol!", 4, warn=True)
self.gap_index=None
@property
def mu(self):
return self._mu
@property
def Pi(self):
return self._Pi
@property
def W(self):
return self._W
@W.setter
def W(self, value):
self.assign_rates(mu=self.mu, pi=self.Pi, W=value)
@Pi.setter
def Pi(self, value):
self.assign_rates(mu=self.mu, pi=value, W=self.W)
@mu.setter
def mu(self, value):
self.assign_rates(mu=value, pi=self.Pi, W=self.W)
@property
def Q(self):
"""function that return the product of the transition matrix
and the equilibrium frequencies to obtain the rate matrix
of the GTR model
"""
Q_tmp = (self.W*self.Pi).T
Q_diag = -np.sum(Q_tmp, axis=0)
np.fill_diagonal(Q_tmp, Q_diag)
return Q_tmp
######################################################################
## constructor methods
######################################################################
def __str__(self):
'''
String representation of the GTR model for pretty printing
'''
multi_site = len(self.Pi.shape)==2
if multi_site:
eq_freq_str = "Average substitution rate (mu): "+str(np.round(self.average_rate,6))+'\n'
else:
eq_freq_str = "Substitution rate (mu): "+str(np.round(self.mu,6))+'\n'
if not multi_site:
eq_freq_str += "\nEquilibrium frequencies (pi_i):\n"
for a,p in zip(self.alphabet, self.Pi):
eq_freq_str+=' '+a+': '+str(np.round(p,4))+'\n'
W_str = "\nSymmetrized rates from j->i (W_ij):\n"
W_str+='\t'+'\t'.join(self.alphabet)+'\n'
for a,Wi in zip(self.alphabet, self.W):
W_str+= ' '+a+'\t'+'\t'.join([str(np.round(max(0,p),4)) for p in Wi])+'\n'
if not multi_site:
Q_str = "\nActual rates from j->i (Q_ij):\n"
Q_str+='\t'+'\t'.join(self.alphabet)+'\n'
for a,Qi in zip(self.alphabet, self.Q):
Q_str+= ' '+a+'\t'+'\t'.join([str(np.round(max(0,p),4)) for p in Qi])+'\n'
return eq_freq_str + W_str + Q_str
@staticmethod
def from_file(gtr_fname):
"""
Parse a GTR string and assign the rates accordingly.
Note that the input string is expected to be formatted exactly like the output of the `__str__` method.
Parameters
----------
gtr_fname : file name
String representation of the GTR model
"""
try:
with open(gtr_fname) as f:
alphabet = []
pi = []
while True:
line = f.readline()
if not line:
break
if line.strip().startswith("Substitution rate (mu):"):
mu = float(line.split(":")[1].strip())
elif line.strip().startswith("Equilibrium frequencies (pi_i):"):
line = f.readline()
while line.strip()!="":
alphabet.append(line.split(":")[0].strip())
pi.append(float(line.split(":")[1].strip()))
line = f.readline()
if not np.any([len(alphabet) == len(a) and np.all(np.array(alphabet) == a) for a in alphabets.values()]):
raise ValueError("GTR: was unable to read custom GTR model in "+str(gtr_fname) +" - Alphabet not recognized")
elif line.strip().startswith("Symmetrized rates from j->i (W_ij):"):
line = f.readline()
line = f.readline()
n = len(pi)
W = np.ones((n,n))
j = 0
while line.strip()!="":
values = line.split()
for i in range(n):
W[j,i] = float(values[i+1])
j +=1
line = f.readline()
if j != n:
raise ValueError("GTR: was unable to read custom GTR model in "+str(gtr_fname) +" - Number of lines in W matrix does not match alphabet length")
gtr = GTR.custom(mu, pi, W, alphabet = alphabet)
return gtr
except:
raise MissingDataError('GTR: was unable to read custom GTR model in '+str(gtr_fname))
[docs] def assign_rates(self, mu=1.0, pi=None, W=None):
"""
Overwrite the GTR model given the provided data
Parameters
----------
mu : float
Substitution rate
W : nxn matrix
Substitution matrix
pi : n vector
Equilibrium frequencies
"""
n = len(self.alphabet)
self._mu = mu
self.is_site_specific=False
if pi is not None and len(pi)==n:
Pi = np.array(pi)
else:
if pi is not None and len(pi)!=n:
self.logger("length of equilibrium frequency vector does not match alphabet length", 4, warn=True)
self.logger("Ignoring input equilibrium frequencies", 4, warn=True)
Pi = np.ones(shape=(n,))
self._Pi = Pi/np.sum(Pi)
if W is None or W.shape!=(n,n):
if (W is not None) and W.shape!=(n,n):
self.logger("Substitution matrix size does not match alphabet size", 4, warn=True)
self.logger("Ignoring input substitution matrix", 4, warn=True)
# flow matrix
W = np.ones((n,n))
np.fill_diagonal(W, 0.0)
np.fill_diagonal(W, - W.sum(axis=0))
else:
W=np.array(W)
self._W = 0.5*(W+W.T)
np.fill_diagonal(W,0)
average_rate = avg_transition(W, self.Pi, gap_index=self.gap_index)
self._W = W/average_rate
self._mu *=average_rate
self._eig()
[docs] @classmethod
def custom(cls, mu=1.0, pi=None, W=None, **kwargs):
"""
Create a GTR model by specifying the matrix explicitly
Parameters
----------
mu : float
Substitution rate
W : nxn matrix
Substitution matrix
pi : n vector
Equilibrium frequencies
**kwargs:
Key word arguments to be passed
Keyword Args
------------
alphabet : str
Specify alphabet when applicable. If the alphabet specification is
required, but no alphabet is specified, the nucleotide alphabet will be used as
default.
"""
gtr = cls(**kwargs)
gtr.assign_rates(mu=mu, pi=pi, W=W)
return gtr
[docs] @staticmethod
def standard(model, **kwargs):
"""
Create standard model of molecular evolution.
Parameters
----------
model : str
Model to create. See list of available models below
**kwargs:
Key word arguments to be passed to the model
**Available models**
- JC69:
Jukes-Cantor 1969 model. This model assumes equal frequencies
of the nucleotides and equal transition rates between nucleotide states.
For more info, see: Jukes and Cantor (1969).
Evolution of Protein Molecules. New York: Academic Press. pp. 21-132.
To create this model, use:
:code:`mygtr = GTR.standard(model='jc69', mu=<my_mu>, alphabet=<my_alph>)`
:code:`my_mu` - substitution rate (float)
:code:`my_alph` - alphabet (str: :code:`'nuc'` or :code:`'nuc_nogap'`)
- K80:
Kimura 1980 model. Assumes equal concentrations across nucleotides, but
allows different rates between transitions and transversions. The ratio
of the transversion/transition rates is given by kappa parameter.
For more info, see
Kimura (1980), J. Mol. Evol. 16 (2): 111-120. doi:10.1007/BF01731581.
Current implementation of the model does not account for the gaps.
:code:`mygtr = GTR.standard(model='k80', mu=<my_mu>, kappa=<my_kappa>)`
:code:`mu` - overall substitution rate (float)
:code:`kappa` - ratio of transversion/transition rates (float)
- F81:
Felsenstein 1981 model. Assumes non-equal concentrations across nucleotides,
but the transition rate between all states is assumed to be equal. See
Felsenstein (1981), J. Mol. Evol. 17 (6): 368-376. doi:10.1007/BF01734359
for details.
:code:`mygtr = GTR.standard(model='F81', mu=<mu>, pi=<pi>, alphabet=<alph>)`
:code:`mu` - substitution rate (float)
:code:`pi` - : nucleotide concentrations (numpy.array)
:code:`alphabet' - alphabet to use. (:code:`'nuc'` or :code:`'nuc_nogap'`)
- HKY85:
Hasegawa, Kishino and Yano 1985 model. Allows different concentrations of the
nucleotides (as in F81) + distinguishes between transition/transversion substitutions
(similar to K80). Link:
Hasegawa, Kishino, Yano (1985), J. Mol. Evol. 22 (2): 160-174. doi:10.1007/BF02101694
Current implementation of the model does not account for the gaps
:code:`mygtr = GTR.standard(model='HKY85', mu=<mu>, pi=<pi>, kappa=<kappa>)`
:code:`mu` - substitution rate (float)
:code:`pi` - : nucleotide concentrations (numpy.array)
:code:`kappa` - ratio of transversion/transition rates (float)
- T92:
Tamura 1992 model. Extending Kimura (1980) model for the case where a
G+C-content bias exists. Link:
Tamura K (1992), Mol. Biol. Evol. 9 (4): 678-687. DOI: 10.1093/oxfordjournals.molbev.a040752
Current implementation of the model does not account for the gaps
:code:`mygtr = GTR.standard(model='T92', mu=<mu>, pi_GC=<pi_gc>, kappa=<kappa>)`
:code:`mu` - substitution rate (float)
:code:`pi_GC` - : relative GC content
:code:`kappa` - ratio of transversion/transition rates (float)
- TN93:
Tamura and Nei 1993. The model distinguishes between the two different types of
transition: (A <-> G) is allowed to have a different rate to (C<->T).
Transversions have the same rate. The frequencies of the nucleotides are allowed
to be different. Link: Tamura, Nei (1993), MolBiol Evol. 10 (3): 512-526.
DOI:10.1093/oxfordjournals.molbev.a040023
:code:`mygtr = GTR.standard(model='TN93', mu=<mu>, kappa1=<k1>, kappa2=<k2>)`
:code:`mu` - substitution rate (float)
:code:`kappa1` - relative A<-->C, A<-->T, T<-->G and G<-->C rates (float)
:code:`kappa` - relative C<-->T rate (float)
.. Note::
Rate of A<-->G substitution is set to one. All other rates
(kappa1, kappa2) are specified relative to this rate
"""
from .nuc_models import JC69, K80, F81, HKY85, T92, TN93
from .aa_models import JTT92
if model.lower() in ['jc', 'jc69', 'jukes-cantor', 'jukes-cantor69', 'jukescantor', 'jukescantor69']:
model = JC69(**kwargs)
elif model.lower() in ['k80', 'kimura80', 'kimura1980']:
model = K80(**kwargs)
elif model.lower() in ['f81', 'felsenstein81', 'felsenstein1981']:
model = F81(**kwargs)
elif model.lower() in ['hky', 'hky85', 'hky1985']:
model = HKY85(**kwargs)
elif model.lower() in ['t92', 'tamura92', 'tamura1992']:
model = T92(**kwargs)
elif model.lower() in ['tn93', 'tamura_nei_93', 'tamuranei93']:
model = TN93(**kwargs)
elif model.lower() in ['jtt', 'jtt92']:
model = JTT92(**kwargs)
else:
raise KeyError("The GTR model '{}' is not in the list of available models."
"".format(model))
model.mu = kwargs['mu'] if 'mu' in kwargs else 1.0
return model
[docs] @classmethod
def random(cls, mu=1.0, alphabet='nuc', rng=None):
"""
Creates a random GTR model
Parameters
----------
mu : float
Substitution rate
alphabet : str
Alphabet name (should be standard: 'nuc', 'nuc_gap', 'aa', 'aa_gap')
"""
if rng is None:
rng = np.random.default_rng()
alphabet=alphabets[alphabet]
gtr = cls(alphabet)
n = gtr.alphabet.shape[0]
pi = 1.0*rng.randint(0,100,size=(n))
W = 1.0*rng.randint(0,100,size=(n,n)) # with gaps
gtr.assign_rates(mu=mu, pi=pi, W=W)
return gtr
[docs] @classmethod
def infer(cls, nij, Ti, root_state, fixed_pi=None, pc=1.0, gap_limit=0.01, **kwargs):
r"""
Infer a GTR model by specifying the number of transitions and time spent in each
character. The basic equation that is being solved is
:math:`n_{ij} = pi_i W_{ij} T_j`
where :math:`n_{ij}` are the transitions, :math:`pi_i` are the equilibrium
state frequencies, :math:`W_{ij}` is the "substitution attempt matrix",
while :math:`T_i` is the time on the tree spent in character state
:math:`i`. To regularize the process, we add pseudocounts and also need
to account for the fact that the root of the tree is in a particular
state. the modified equation is
:math:`n_{ij} + pc = pi_i W_{ij} (T_j+pc+root\_state)`
Parameters
----------
nij : nxn matrix
The number of times a change in character state is observed
between state j and i
Ti :n vector
The time spent in each character state
root_state : n vector
The number of characters in state i in the sequence
of the root node.
pc : float
Pseudocounts, this determines the lower cutoff on the rate when
no substitutions are observed
**kwargs:
Key word arguments to be passed
Keyword Args
------------
alphabet : str
Specify alphabet when applicable. If the alphabet specification
is required, but no alphabet is specified, the nucleotide alphabet will be used as default.
"""
from scipy import linalg as LA
gtr = cls(**kwargs)
gtr.logger("GTR: model inference ",1)
dp = 1e-5
Nit = 40
pc_mat = pc*np.ones_like(nij)
np.fill_diagonal(pc_mat, 0.0)
np.fill_diagonal(nij, 0.0)
count = 0
pi_old = np.zeros_like(Ti)
if fixed_pi is None:
pi = np.ones_like(Ti)
else:
pi = np.copy(fixed_pi)
pi/=pi.sum()
W_ij = np.ones_like(nij)
mu = (nij.sum()+pc)/(Ti.sum()+pc)
# if pi is fixed, this will immediately converge
while LA.norm(pi_old-pi) > dp and count < Nit:
gtr.logger(' '.join(map(str, ['GTR inference iteration',count,'change:',LA.norm(pi_old-pi)])), 3)
count += 1
pi_old = np.copy(pi)
W_ij = (nij+nij.T+2*pc_mat)/mu/(np.outer(pi,Ti) + np.outer(Ti,pi) + ttconf.TINY_NUMBER + 2*pc_mat)
np.fill_diagonal(W_ij, 0)
scale_factor = avg_transition(W_ij,pi, gap_index=gtr.gap_index)
W_ij = W_ij/scale_factor
if fixed_pi is None:
pi = (np.sum(nij+pc_mat,axis=1)+root_state)/(ttconf.TINY_NUMBER + mu*np.dot(W_ij,Ti)+root_state.sum()+np.sum(pc_mat, axis=1))
pi /= pi.sum()
mu = (nij.sum() + pc)/(np.sum(pi * (W_ij.dot(Ti)))+pc)
else:
mu = (nij.sum() + pc)/(np.sum(pi * (W_ij.dot(pi)))*Ti.sum() + pc)
if count >= Nit:
gtr.logger('WARNING: maximum number of iterations has been reached in GTR inference',3, warn=True)
if LA.norm(pi_old-pi) > dp:
gtr.logger('the iterative scheme has not converged',3,warn=True)
elif np.abs(1-np.max(pi.sum(axis=0))) > dp:
gtr.logger('the iterative scheme has converged, but proper normalization was not reached',3,warn=True)
if gtr.gap_index is not None:
if pi[gtr.gap_index]<gap_limit:
gtr.logger('The model allows for gaps which are estimated to occur at a low fraction of %1.3e'%pi[gtr.gap_index]+
' this can potentially result in artificats.'+
' gap fraction will be set to %1.4f'%gap_limit,2,warn=True)
pi[gtr.gap_index] = gap_limit
pi /= pi.sum()
gtr.assign_rates(mu=mu, W=W_ij, pi=pi)
return gtr
########################################################################
### prepare model
########################################################################
def _eig(self):
"""
Perform eigendecompositon of the rate matrix and stores the left- and right-
matrices to convert the sequence profiles to the GTR matrix eigenspace
and hence to speed-up the computations.
"""
self.eigenvals, self.v, self.v_inv = self._eig_single_site(self.W, self.Pi)
def _eig_single_site(self, W, p):
"""
Perform eigendecompositon of the rate matrix and stores the left- and right-
matrices to convert the sequence profiles to the GTR matrix eigenspace
and hence to speed-up the computations.
NOTE: this assumes the diagonal of W is all zeros
"""
# eigendecomposition of the rate matrix
assert np.abs(np.diag(W).sum())<1e-10
tmpp = np.sqrt(p)
symQ = W*np.outer(tmpp, tmpp)
np.fill_diagonal(symQ, -np.sum(W*p, axis=1))
eigvals, eigvecs = np.linalg.eigh(symQ)
tmp_v = eigvecs.T*tmpp
one_norm = np.sum(np.abs(tmp_v), axis=1)
return eigvals, tmp_v.T/one_norm, (eigvecs*one_norm).T/tmpp
[docs] def state_pair(self, seq_p, seq_ch, pattern_multiplicity=None,
ignore_gaps=False):
'''
Make a compressed representation of a pair of sequences, only counting
the number of times a particular pair of states (e.g. (A,T)) is observed
in the aligned sequences of parent and child.
Parameters
----------
seq_p: numpy array
Parent sequence as numpy array of chars
seq_ch: numpy array
Child sequence as numpy array of chars
pattern_multiplicity : numpy array
If sequences are reduced by combining identical alignment patterns,
these multplicities need to be accounted for when counting the number
of mutations across a branch. If None, all pattern are assumed to
occur exactly once.
ignore_gap: bool
Whether or not to include gapped positions of the alignment
in the multiplicity count
Returns
-------
seq_pair : list
:code:`[(0,1), (2,2), (3,4)]` list of parent_child state pairs
as indices in the alphabet
multiplicity : numpy array
Number of times a particular pair is observed
'''
if pattern_multiplicity is None:
pattern_multiplicity = np.ones_like(seq_p, dtype=float)
from collections import Counter
if seq_ch.shape != seq_p.shape:
raise ValueError("GTR.state_pair: Sequence lengths do not match!")
if len(self.alphabet)<10: # for small alphabet, repeatedly check array for all state pairs
pair_count = []
bool_seqs_p = []
bool_seqs_ch = []
for seq, bs in [(seq_p,bool_seqs_p), (seq_ch, bool_seqs_ch)]:
for ni,nuc in enumerate(self.alphabet):
bs.append(seq==nuc)
for n1,nuc1 in enumerate(self.alphabet):
if (self.gap_index is None) or (not ignore_gaps) or (n1!=self.gap_index):
for n2,nuc2 in enumerate(self.alphabet):
if (self.gap_index is None) or (not ignore_gaps) or (n2!=self.gap_index):
count = ((bool_seqs_p[n1]&bool_seqs_ch[n2])*pattern_multiplicity).sum()
if count: pair_count.append(((n1,n2), count))
else: # enumerate state pairs of the sequence for large alphabets
num_seqs = []
for seq in [seq_p, seq_ch]: # for each sequence (parent and child) construct a numerical sequence [0,5,3,1,2,3...]
tmp = np.ones_like(seq, dtype=int)
for ni,nuc in enumerate(self.alphabet):
tmp[seq==nuc] = ni # set each position corresponding to a state to the corresponding index
num_seqs.append(tmp)
pair_count = defaultdict(int)
if ignore_gaps: # if gaps are ignored skip positions where one or the other sequence is gapped
for i in range(len(seq_p)):
if self.gap_index!=num_seqs[0][i] and self.gap_index!=num_seqs[1][i]:
pair_count[(num_seqs[0][i],num_seqs[1][i])]+=pattern_multiplicity[i]
else: # otherwise, just count
for i in range(len(seq_p)):
pair_count[(num_seqs[0][i],num_seqs[1][i])]+=pattern_multiplicity[i]
pair_count = pair_count.items()
return (np.array([x[0] for x in pair_count], dtype=int), # [(child_nuc, parent_nuc),()...]
np.array([x[1] for x in pair_count], dtype=int)) # multiplicity of each parent/child nuc pair
########################################################################
### evolution functions
########################################################################
[docs] def prob_t_compressed(self, seq_pair, multiplicity, t, return_log=False):
'''
Calculate the probability of observing a sequence pair at a distance t,
for compressed sequences
Parameters
----------
seq_pair : numpy array
:code:`np.array([(0,1), (2,2), ()..])` as indicies of
pairs of aligned positions. (e.g. 'A'==0, 'C'==1 etc).
This only lists all occuring parent-child state pairs, order is irrelevant
multiplicity : numpy array
The number of times a parent-child state pair is observed.
This allows compression of the sequence representation
t : float
Length of the branch separating parent and child
return_log : bool
Whether or not to exponentiate the result
'''
if t<0:
logP = -ttconf.BIG_NUMBER
else:
tmp_eQT = self.expQt(t)
logQt = np.log(np.maximum(tmp_eQT, ttconf.SUPERTINY_NUMBER))
# logQt[~np.isfinite(logQt)] = -ttconf.BIG_NUMBER
logP = np.sum(logQt[seq_pair[:,1], seq_pair[:,0]]*multiplicity)
return logP if return_log else np.exp(logP)
[docs] def prob_t(self, seq_p, seq_ch, t, pattern_multiplicity = None,
return_log=False, ignore_gaps=True):
"""
Compute the probability to observe seq_ch (child sequence) after time t starting from seq_p
(parent sequence).
Parameters
----------
seq_p : character array
Parent sequence
seq_c : character array
Child sequence
t : double
Time (branch len) separating the profiles.
pattern_multiplicity : numpy array
If sequences are reduced by combining identical alignment patterns,
these multplicities need to be accounted for when counting the number
of mutations across a branch. If None, all pattern are assumed to
occur exactly once.
return_log : bool
It True, return log-probability.
Returns
-------
prob : np.array
Resulting probability
"""
seq_pair, multiplicity = self.state_pair(seq_p, seq_ch,
pattern_multiplicity=pattern_multiplicity, ignore_gaps=ignore_gaps)
return self.prob_t_compressed(seq_pair, multiplicity, t, return_log=return_log)
[docs] def optimal_t(self, seq_p, seq_ch, pattern_multiplicity=None, ignore_gaps=False):
'''
Find the optimal distance between the two sequences
Parameters
----------
seq_p : character array
Parent sequence
seq_c : character array
Child sequence
pattern_multiplicity : numpy array
If sequences are reduced by combining identical alignment patterns,
these multplicities need to be accounted for when counting the number
of mutations across a branch. If None, all pattern are assumed to
occur exactly once.
ignore_gaps : bool
If True, ignore gaps in distance calculations
'''
seq_pair, multiplicity = self.state_pair(seq_p, seq_ch,
pattern_multiplicity = pattern_multiplicity,
ignore_gaps=ignore_gaps)
return self.optimal_t_compressed(seq_pair, multiplicity)
[docs] def optimal_t_compressed(self, seq_pair, multiplicity, profiles=False, tol=1e-10):
"""
Find the optimal distance between the two sequences represented as state_pairs
or as pair of profiles
Parameters
----------
seq_pair : state_pair, tuple
Compressed representation of sequences along a branch, either
as tuple of state pairs or as tuple of profiles.
multiplicity : array
Number of times each state pair in seq_pair appears (if profile==False)
Number of times an alignment pattern is observed (if profiles==True)
profiles : bool, default False
The standard branch length optimization assumes fixed sequences at
either end of the branch. With profiles==True, optimization is performed
while summing over all possible states of the nodes at either end of the
branch. Note that the meaning/format of seq_pair and multiplicity
depend on the value of :profiles:.
"""
def _neg_prob(t, seq_pair, multiplicity):
"""
Probability to observe a child given the the parent state, transition
matrix, and the time of evolution (branch length).
Parameters
----------
t : double
Branch length (time between sequences)
seq_pair : tuple of profiles
parent and child sequences
multiplicity : vector containing the number of times each alignment pattern is observed
Returns
-------
prob : double
Negative probability of the two given sequences
to be separated by the time t.
"""
if profiles:
res = -1.0*self.prob_t_profiles(seq_pair, multiplicity,t**2, return_log=True)
return res + np.exp(t**4/10000)
else:
return -1.0*self.prob_t_compressed(seq_pair, multiplicity,t**2, return_log=True)
hamming_distance = np.sum(multiplicity[seq_pair[:,1]!=seq_pair[:,0]])/np.sum(multiplicity)
try:
from scipy.optimize import minimize_scalar
opt = minimize_scalar(_neg_prob,
bracket=[-np.sqrt(ttconf.MAX_BRANCH_LENGTH), np.sqrt(hamming_distance), np.sqrt(ttconf.MAX_BRANCH_LENGTH)],
args=(seq_pair, multiplicity), tol=tol, method='brent')
new_len = opt["x"]**2
if 'success' not in opt:
opt['success'] = True
self.logger("WARNING: the optimization result does not contain a 'success' flag:"+str(opt),4, warn=True)
except ImportError:
import scipy
print('legacy scipy', scipy.__version__)
from scipy.optimize import fminbound
new_len = fminbound(_neg_prob,
-np.sqrt(ttconf.MAX_BRANCH_LENGTH),np.sqrt(ttconf.MAX_BRANCH_LENGTH),
args=(seq_pair, multiplicity))
new_len = new_len**2
opt={'success':True}
if new_len > .9 * ttconf.MAX_BRANCH_LENGTH:
self.logger("WARNING: GTR.optimal_t_compressed -- The branch length seems to be very long!", 4, warn=True)
if opt["success"] != True:
# return hamming distance: number of state pairs where state differs/all pairs
new_len = hamming_distance
return new_len
[docs] def prob_t_profiles(self, profile_pair, multiplicity, t,
return_log=False, ignore_gaps=True):
'''
Calculate the probability of observing a node pair at a distance t
Parameters
----------
profile_pair: numpy arrays
Probability distributions of the nucleotides at either
end of the branch. pp[0] = parent, pp[1] = child
multiplicity : numpy array
The number of times an alignment pattern is observed
t : float
Length of the branch separating parent and child
ignore_gaps: bool
If True, ignore mutations to and from gaps in distance calculations
return_log : bool
Whether or not to exponentiate the result
'''
if t<0:
logP = -ttconf.BIG_NUMBER
else:
Qt = self.expQt(t)
if len(Qt.shape)==3: # site specific GTR model
res = np.einsum('ai,ija,aj->a', profile_pair[1], Qt, profile_pair[0])
else:
res = np.einsum('ai,ij,aj->a', profile_pair[1], Qt, profile_pair[0])
if ignore_gaps and (self.gap_index is not None): # calculate the probability that neither outgroup/node has a gap
non_gap_frac = (1-profile_pair[0][:,self.gap_index])*(1-profile_pair[1][:,self.gap_index])
# weigh log LH by the non-gap probability
logP = np.sum(multiplicity*np.log(res+ttconf.SUPERTINY_NUMBER)*non_gap_frac)
else:
logP = np.sum(multiplicity*np.log(res+ttconf.SUPERTINY_NUMBER))
return logP if return_log else np.exp(logP)
[docs] def propagate_profile(self, profile, t, return_log=False):
"""
Compute the probability of the sequence state of the parent
at time (t+t0, backwards), given the sequence state of the
child (profile) at time t0.
Parameters
----------
profile : numpy.array
Sequence profile. Shape = (L, a),
where L - sequence length, a - alphabet size.
t : double
Time to propagate
return_log: bool
If True, return log-probability
Returns
-------
res : np.array
Profile of the sequence after time t in the past.
Shape = (L, a), where L - sequence length, a - alphabet size.
"""
Qt = self.expQt(t)
res = profile.dot(Qt)
return np.log(res) if return_log else res
def evolve(self, profile, t, return_log=False):
"""
Compute the probability of the sequence state of the child
at time t later, given the parent profile.
Parameters
----------
profile : numpy.array
Sequence profile. Shape = (L, a),
where L - sequence length, a - alphabet size.
t : double
Time to propagate
return_log: bool
If True, return log-probability
Returns
-------
res : np.array
Profile of the sequence after time t in the future.
Shape = (L, a), where L - sequence length, a - alphabet size.
"""
Qt = self.expQt(t).T
res = profile.dot(Qt)
return np.log(res) if return_log else res
def _exp_lt(self, t):
"""
Parameters
----------
t : float
time to propagate
Returns
--------
exp_lt : numpy.array
Array of values exp(lambda(i) * t),
where (i) - alphabet index (the eigenvalue number).
"""
log_val = self.mu * t * self.eigenvals
if any(i > 10 for i in log_val):
raise ValueError("Error in computing exp(Q * t): Q has positive eigenvalues or the branch length t is too large. "
"This is most likely caused by incorrect input data.")
return np.exp(log_val)
[docs] def expQt(self, t):
'''
Parameters
----------
t : float
Time to propagate
Returns
--------
expQt : numpy.array
Matrix exponential of exo(Qt)
'''
eLambdaT = np.diag(self._exp_lt(t)) # vector length = a
Qs = self.v.dot(eLambdaT.dot(self.v_inv)) # This is P(nuc1 | given nuc_2)
return np.maximum(0,Qs)
def expQs(self, s):
return self.expQt(s**2)
def expQsds(self, s):
r'''
Returns
-------
Qtds : Returns 2 V_{ij} \lambda_j s e^{\lambda_j s**2 } V^{-1}_{jk}
This is the derivative of the branch probability with respect to s=\sqrt(t)
'''
lambda_eLambdaT = np.diag(2.0*self._exp_lt(s**2)*self.eigenvals*s)
return self.v.dot(lambda_eLambdaT.dot(self.v_inv))
[docs] def sequence_logLH(self,seq, pattern_multiplicity=None):
"""
Returns the log-likelihood of sampling a sequence from equilibrium frequency.
Expects a sequence as numpy array
Parameters
----------
seq : numpy array
Compressed sequence as an array of chars
pattern_multiplicity : numpy_array
The number of times each position in sequence is observed in the
initial alignment. If None, sequence is assumed to be not compressed
"""
if pattern_multiplicity is None:
pattern_multiplicity = np.ones_like(seq, dtype=float)
return np.sum([np.sum((seq==state)*pattern_multiplicity*np.log(self.Pi[si]))
for si,state in enumerate(self.alphabet)])
def average_rate(self):
return self.mu*avg_transition(self.W, self.Pi, gap_index=self.gap_index)
def save_to_npz(self, outfile):
full_gtr = self.mu * np.dot(self.Pi, self.W)
desc=np.array(["GTR matrix description\n", "Substitution rate: " + str(self.mu)])
np.savez(outfile, description=desc,
full_gtr=full_gtr,
char_dist=self.Pi,
flow_matrix=self.W)
if __name__ == "__main__":
pass