import gzip
import numpy as np
from collections import defaultdict
from textwrap import fill
## Functions to read in and print out VCF files
[docs]def read_vcf(vcf_file, ref_file=None):
"""
Reads in a vcf/vcf.gz file and associated
reference sequence fasta (to which the VCF file is mapped).
Parses mutations, insertions, and deletions and stores them in a nested dict,
see 'returns' for the dict structure.
Calls with heterozygous values 0/1, 0/2, etc and no-calls (./.) are
replaced with Ns at the associated sites.
Positions are stored to correspond the location in the reference sequence
in Python (numbering is transformed to start at 0)
Parameters
----------
vcf_file : string
Path to the vcf or vcf.gz file to be read in
ref_file : string, optional
Path to the fasta reference file to be read in
Returns
--------
compress_seq : nested dict
In the format: ::
{
'reference':'AGCTCGA..A',
'sequences': { 'seq1':{4:'A', 7:'-'}, 'seq2':{100:'C'} },
'insertions': { 'seq1':{4:'ATT'}, 'seq3':{1:'TT', 10:'CAG'} },
'positions': [1,4,7,10,100...]
}
references : string
String of the reference sequence read from the Fasta, to which
the variable sites are mapped
sequences : nested dict
Dict containing sequence names as keys which map to dicts
that have position as key and the single-base mutation (or deletion)
as values
insertions : nested dict
Dict in the same format as the above, which stores insertions and their
locations. The first base of the insertion is the same as whatever is
currently in that position (Ref if no mutation, mutation in 'sequences'
otherwise), so the current base can be directly replaced by the bases held here.
positions : list
Python list of all positions with a mutation, insertion, or deletion.
"""
#Programming Note:
# Note on VCF Format
# -------------------
# 'Insertion where there are also deletions' (special handling)
# Ex:
# REF ALT Seq1 Seq2
# GC GCC,G 1/1 2/2
# Insertions formatted differently - don't know how many bp match
# the Ref (unlike simple insert below). Could be mutations, also.
# 'Deletion'
# Ex:
# REF ALT
# GC G
# Alt does not have to be 1 bp - any length shorter than Ref.
# 'Insertion'
# Ex:
# REF ALT
# A ATT
# First base always matches Ref.
# 'No indel'
# Ex:
# REF ALT
# A G
#define here, so that all sub-functions can access them
sequences = defaultdict(dict)
insertions = defaultdict(dict) #Currently not used, but kept in case of future use.
#TreeTime handles 2-3 base ambig codes, this will allow that.
def getAmbigCode(bp1, bp2, bp3=""):
bps = [bp1,bp2,bp3]
bps.sort()
key = "".join(bps)
return {
'CT': 'Y',
'AG': 'R',
'AT': 'W',
'CG': 'S',
'GT': 'K',
'AC': 'M',
'AGT': 'D',
'ACG': 'V',
'ACT': 'H',
'CGT': 'B'
}[key]
#Parses a 'normal' (not hetero or no-call) call depending if insertion+deletion, insertion,
#deletion, or single bp subsitution
def parseCall(snps, ins, pos, ref, alt):
#Insertion where there are also deletions (special handling)
if len(ref) > 1 and len(alt)>len(ref):
for i in range(len(ref)):
#if the pos doesn't match, store in sequences
if ref[i] != alt[i]:
snps[pos+i] = (alt[i] if alt[i] != '.' else 'N') #'.' = no-call
#if about to run out of ref, store rest:
if (i+1) >= len(ref):
ins[pos+i] = alt[i:]
#Deletion
elif len(ref) > 1:
for i in range(len(ref)):
#if ref is longer than alt, these are deletion positions
if i+1 > len(alt):
snps[pos+i] = '-'
#if not, there may be mutations
else:
if ref[i] != alt[i]:
snps[pos+i] = (alt[i] if alt[i] != '.' else 'N') #'.' = no-call
#Insertion
elif len(alt) > 1:
ins[pos] = alt
#No indel
else:
snps[pos] = alt
#Parses a 'bad' (hetero or no-call) call depending on what it is
def parseBadCall(gen, snps, ins, pos, ref, ALT):
#Deletion
# REF ALT Seq1 Seq2 Seq3
# GCC G 1/1 0/1 ./.
# Seq1 (processed by parseCall, above) will become 'G--'
# Seq2 will become 'GNN'
# Seq3 will become 'GNN'
if len(ref) > 1:
#Deleted part becomes Ns
if gen[0] == '0' or gen[0] == '.':
if gen[0] == '0': #if het, get first bp
alt = str(ALT[int(gen[2])-1])
else: #if no-call, there is no alt, so just put Ns after 1st ref base
alt = ref[0]
for i in range(len(ref)):
#if ref is longer than alt, these are deletion positions
if i+1 > len(alt):
snps[pos+i] = 'N'
#if not, there may be mutations
else:
if ref[i] != alt[i]:
snps[pos+i] = (alt[i] if alt[i] != '.' else 'N') #'.' = no-call
#If not deletion, need to know call type
#if het, see if proposed alt is 1bp mutation
elif gen[0] == '0':
alt = str(ALT[int(gen[2])-1])
if len(alt)==1:
#alt = getAmbigCode(ref,alt) #if want to allow ambig
alt = 'N' #if you want to disregard ambig
snps[pos] = alt
#else a het-call insertion, so ignore.
#else it's a no-call; see if all alts have a length of 1
#(meaning a simple 1bp mutation)
elif len(ALT)==len("".join(ALT)):
alt = 'N'
snps[pos] = alt
#else a no-call insertion, so ignore.
#House code is *much* faster than pyvcf because we don't care about all info
#about coverage, quality, counts, etc, which pyvcf goes to effort to parse
#(and it's not easy as there's no standard ordering). Custom code can completely
#ignore all of this.
import gzip
from Bio import SeqIO
import numpy as np
nsamp = 0
posLoc = 0
refLoc = 0
altLoc = 0
sampLoc = 9
#Use different openers depending on whether compressed
opn = gzip.open if vcf_file.endswith(('.gz', '.GZ')) else open
with opn(vcf_file, mode='rt') as f:
for line in f:
if line[0] != '#':
#actual data - most common so first in 'if-list'!
dat = line.strip().split('\t')
POS = int(dat[posLoc])
REF = dat[refLoc]
ALT = dat[altLoc].split(',')
calls = np.array(dat[sampLoc:])
#get samples that differ from Ref at this site
recCalls = {}
for sname, sa in zip(samps, calls):
if ':' in sa: #if proper VCF file (followed by quality/coverage info)
gt = sa.split(':')[0]
else: #if 'pseudo' VCF file (nextstrain output, or otherwise stripped)
gt = sa
# convert haploid calls to pseudo diploid
if gt == '0':
gt = '0/0'
elif gt == '1':
gt = '1/1'
elif gt == '.':
gt = './.'
#ignore if ref call: '.' or '0/0', depending on VCF
if ('/' in gt and gt != '0/0') or ('|' in gt and gt != '0|0'):
recCalls[sname] = gt
#store the position and the alt
for seq, gen in recCalls.items():
ref = REF
pos = POS-1 #VCF numbering starts from 1, but Reference seq numbering
#will be from 0 because it's python!
#Accepts only calls that are 1/1, 2/2 etc. Rejects hets and no-calls
if gen[0] != '0' and gen[2] != '0' and gen[0] != '.' and gen[2] != '.':
alt = str(ALT[int(gen[0])-1]) #get the index of the alternate
if seq not in sequences.keys():
sequences[seq] = {}
parseCall(sequences[seq],insertions[seq], pos, ref, alt)
#If is heterozygote call (0/1) or no call (./.)
else:
#alt will differ here depending on het or no-call, must pass original
parseBadCall(gen, sequences[seq],insertions[seq], pos, ref, ALT)
elif line[0] == '#' and line[1] == 'C':
#header line, get all the information
header = line.strip().split('\t')
posLoc = header.index("POS")
refLoc = header.index('REF')
altLoc = header.index('ALT')
sampLoc = header.index('FORMAT')+1
samps = header[sampLoc:]
samps = [ x.strip() for x in samps ] #ensure no leading/trailing spaces
nsamp = len(samps)
#else you are a comment line, ignore.
#Gather all variable positions
positions = set()
for seq, muts in sequences.items():
positions.update(muts.keys())
#One or more seqs are same as ref! (No non-ref calls) So haven't been 'seen' yet
if nsamp > len(sequences):
missings = set(samps).difference(sequences.keys())
for s in missings:
sequences[s] = {}
if ref_file:
refSeq = SeqIO.read(ref_file, format='fasta')
refSeq = refSeq.upper() #convert to uppercase to avoid unknown chars later
refSeqStr = str(refSeq.seq)
else:
refSeqStr = None
compress_seq = {'reference':refSeqStr,
'sequences': sequences,
'insertions': insertions,
'positions': sorted(positions)}
return compress_seq
[docs]def write_vcf(tree_dict, file_name):#, compress=False):
"""
Writes out a VCF-style file (which seems to be minimally handleable
by vcftools and pyvcf) of the alignment. This is created from a dict
in a similar format to what's created by :py:meth:`treetime.vcf_utils.read_vcf`
Positions of variable sites are transformed to start at 1 to match
VCF convention.
Parameters
----------
tree_dict: nested dict
A nested dict with keys 'sequence' 'reference' and 'positions',
as is created by :py:meth:`treetime.TreeAnc.get_tree_dict`
file_name: str
File to which the new VCF should be written out. File names ending with
'.gz' will result in the VCF automatically being gzipped.
"""
# Programming Logic Note:
#
# For a sequence like:
# Pos 1 2 3 4 5 6
# Ref A C T T A C
# Seq1 A C - - - G
#
# In a dict it is stored:
# Seq1:{3:'-', 4:'-', 5:'-', 6:'G'} (Numbering from 1 for simplicity)
#
# In a VCF it needs to be:
# POS REF ALT Seq1
# 2 CTTA C 1/1
# 6 C G 1/1
#
# If a position is deleted (pos 3), need to get invariable position preceeding it
#
# However, in alternative case, the base before a deletion is mutant, so need to check
# that next position isn't a deletion (as otherwise won't be found until after the
# current single bp mutation is written out)
#
# When deleted position found, need to gather up all adjacent mutant positions with deletions,
# but not include adjacent mutant positions that aren't deletions (pos 6)
#
# Don't run off the 'end' of the position list if deletion is the last thing to be included
# in the VCF file
sequences = tree_dict['sequences']
ref = tree_dict['reference']
positions = tree_dict['positions']
def handleDeletions(i, pi, pos, ref, delete, pattern):
refb = ref[pi]
if delete: #Need to get the position before
i-=1 #As we'll next go to this position again
pi-=1
pos = pi+1
refb = ref[pi]
#re-get pattern
pattern = []
for k,v in sequences.items():
try:
pattern.append(sequences[k][pi])
except KeyError:
pattern.append(ref[pi])
pattern = np.array(pattern).astype('U')
sites = []
sites.append(pattern)
#Gather all positions affected by deletion - but don't run off end of position list
while (i+1) < len(positions) and positions[i+1] == pi+1:
i+=1
pi = positions[i]
pattern = []
for k,v in sequences.items():
try:
pattern.append(sequences[k][pi])
except KeyError:
pattern.append(ref[pi])
pattern = np.array(pattern).astype('U')
#Stops 'greedy' behaviour from adding mutations adjacent to deletions
if any(pattern == '-'): #if part of deletion, append
sites.append(pattern)
refb = refb+ref[pi]
else: #this is another mutation next to the deletion!
i-=1 #don't append, break this loop
#Rotate them into 'calls'
align = np.asarray(sites).T
#Get rid of '-', and put '.' for calls that match ref
#Only removes trailing '-'. This breaks VCF convension, but the standard
#VCF way of handling this* is really complicated, and the situation is rare.
#(*deletions and mutations at the same locations)
fullpat = []
for pt in align:
gp = len(pt)-1
while pt[gp] == '-':
pt[gp] = ''
gp-=1
pat = "".join(pt)
if pat == refb:
fullpat.append('.')
else:
fullpat.append(pat)
pattern = np.array(fullpat)
return i, pi, pos, refb, pattern
#prepare the header of the VCF & write out
header=["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"]+list(sequences.keys())
opn = gzip.open if file_name.endswith(('.gz', '.GZ')) else open
out_file = opn(file_name, 'w')
out_file.write( "##fileformat=VCFv4.2\n"+
"##source=NextStrain\n"+
"##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">\n")
out_file.write("\t".join(header)+"\n")
vcfWrite = []
errorPositions = []
explainedErrors = 0
#Why so basic? Because we sometimes have to back up a position!
i=0
while i < len(positions):
#Get the 'pattern' of all calls at this position.
#Look out specifically for current (this pos) or upcoming (next pos) deletions
#But also distinguish these two, as handled differently.
pi = positions[i]
pos = pi+1 #change numbering to match VCF, not python, for output
refb = ref[pi] #reference base at this position
delete = False #deletion at this position - need to grab previous base (invariable)
deleteGroup = False #deletion at next position (mutation at this pos) - do not need to get prev base
#try/except is much more efficient than 'if' statements for constructing patterns,
#as on average a 'variable' location will not be variable for any given sequence
pattern = []
#pattern2 gets the pattern at next position to check for upcoming deletions
#it's more efficient to get both here rather than loop through sequences twice!
pattern2 = []
for k,v in sequences.items():
try:
pattern.append(sequences[k][pi])
except KeyError:
pattern.append(ref[pi])
try:
pattern2.append(sequences[k][pi+1])
except KeyError:
try:
pattern2.append(ref[pi+1])
except IndexError:
pass
pattern = np.array(pattern).astype('U')
pattern2 = np.array(pattern2).astype('U')
#If a deletion here, need to gather up all bases, and position before
if any(pattern == '-'):
if pos != 1:
deleteGroup = True
delete = True
else:
#If theres a deletion in 1st pos, VCF files do not handle this well.
#Proceed keeping it as '-' for alt (violates VCF), but warn user to check output.
#(This is rare)
print(fill("WARNING: You have a deletion in the first position of your"
" alignment. VCF format does not handle this well. Please check"
" the output to ensure it is correct."))
else:
#If a deletion in next pos, need to gather up all bases
if any(pattern2 == '-'):
deleteGroup = True
#If deletion, treat affected bases as 1 'call':
if delete or deleteGroup:
i, pi, pos, refb, pattern = handleDeletions(i, pi, pos, ref, delete, pattern)
#If no deletion, replace ref with '.', as in VCF format
else:
pattern[pattern==refb] = '.'
#Get the list of ALTs - minus any '.'!
uniques = np.unique(pattern)
uniques = uniques[np.where(uniques!='.')]
#Convert bases to the number that matches the ALT
j=1
for u in uniques:
pattern[np.where(pattern==u)[0]] = str(j)
j+=1
#Now convert these calls to #/# (VCF format)
calls = [ j+"/"+j if j!='.' else '.' for j in pattern ]
#What if there's no variation at a variable site??
#This can happen when sites are modified by TreeTime - see below.
printPos = True
if len(uniques)==0:
#If we expect it (it was made constant by TreeTime), it's fine.
if 'inferred_const_sites' in tree_dict and pi in tree_dict['inferred_const_sites']:
explainedErrors += 1
printPos = False #and don't output position to the VCF
else:
#If we don't expect, raise an error
errorPositions.append(str(pi))
#Write it out - Increment positions by 1 so it's in VCF numbering
#If no longer variable, and explained, don't write it out
if printPos:
output = ["MTB_anc", str(pos), ".", refb, ",".join(uniques), ".", "PASS", ".", "GT"] + calls
vcfWrite.append("\t".join(output))
i+=1
#Note: The number of 'inferred_const_sites' passed back by TreeTime will often be longer
#than the number of 'site that were made constant' that prints below. This is because given the site:
# Ref Alt Seq
# G A AANAA
#This will be converted to 'AAAAA' and listed as an 'inferred_const_sites'. However, for VCF
#purposes, because the site is 'variant' against the ref, it is variant, as expected, and so
#won't be counted in the below list, which is only sites removed from the VCF.
if 'inferred_const_sites' in tree_dict and explainedErrors != 0:
print(fill("Sites that were constant except for ambiguous bases were made" +
" constant by TreeTime. This happened {} times. These sites are".format(explainedErrors) +
" now excluded from the VCF."))
if len(errorPositions) != 0:
print ("\n***WARNING: vcf_utils.py")
print(fill("\n{} sites were found that had no alternative bases.".format(str(len(errorPositions)))+
" If this data has been run through TreeTime and contains ambiguous bases,"
" try calling get_tree_dict with var_ambigs=True to see if this clears the error."))
print(fill("\nAlternative causes:"
"\n- Not all sequences in your alignment are in the tree"
" (if you are running TreeTime via commandline this is most likely)"
"\n- In TreeTime, can be caused by overwriting variants in tips with small branch lengths (debug)"
"\n\nThese are the positions affected (numbering starts at 0):"))
print(fill(", ".join(errorPositions)))
out_file.write("\n".join(vcfWrite))
out_file.close()
def process_sparse_alignment(aln, ref, ambiguous_char):
return process_alignment_dictionary(aln, ref, ambiguous_char)
def process_alignment_dictionary(aln, ref, ambiguous_char):
"""
prepare the dictionary specifying differences from a reference sequence
to construct the reduced alignment with variable sites only. NOTE:
- sites can be constant but different from the reference
- sites can be constant plus a ambiguous sites
assigns
-------
- self.nonref_positions: at least one sequence is different from ref
Returns
-------
reduced_alignment_const
reduced alignment accounting for non-variable postitions
alignment_patterns_const
dict pattern -> (pos in reduced alignment, list of pos in full alignment)
variable_positions
list of variable positions needed to construct remaining
"""
# number of sequences in alignment
nseq = len(aln)
inv_map = defaultdict(list)
for k,v in aln.items():
for pos, bs in v.items():
inv_map[pos].append(bs)
nonref_positions = np.sort(list(inv_map.keys()))
constant_up_to_ambiguous = []
nonref_const = []
nonref_alleles = []
ambiguous_const = []
variable_pos = []
for pos, bs in inv_map.items(): #loop over positions and patterns
bases = list(np.unique(bs))
if len(bs) == nseq: #every sequence is different from reference
if (len(bases)<=2 and ambiguous_char in bases) or len(bases)==1:
# all sequences different from reference, but only one state
# (other than ambiguous_char) in column
nonref_const.append(pos)
if len(bases)==1:
nonref_alleles.append(bases[0])
else:
nonref_alleles.append([x for x in bases if x!=ambiguous_char][0])
if ambiguous_char in bases: #keep track of sites 'made constant'
constant_up_to_ambiguous.append(pos)
else:
# at least two non-reference alleles
variable_pos.append(pos)
else: # not every sequence different from reference
if len(bases)==1 and bases[0]==ambiguous_char:
ambiguous_const.append(pos)
constant_up_to_ambiguous.append(pos) #keep track of sites 'made constant'
else:
# at least one non ambiguous non-reference allele not in
# every sequence
variable_pos.append(pos)
refMod = np.copy(ref)
# place constant non reference positions by their respective allele
refMod[nonref_const] = nonref_alleles
# mask variable positions
states = np.unique(refMod)
refMod[variable_pos] = '.'
# for each base in the gtr, make constant alignment pattern and
# assign it to all const positions in the modified reference sequence
constant_columns = []
constant_patterns = {}
for base in states:
if base==ambiguous_char:
continue
p = np.repeat(base, nseq)
pos = list(np.where(refMod==base)[0])
#if the alignment doesn't have a const site of this base, don't add! (ex: no '----' site!)
if len(pos):
constant_patterns["".join(p.astype('U'))] = [len(constant_columns), pos]
constant_columns.append(p)
return {"constant_columns": constant_columns,
"constant_patterns": constant_patterns,
"variable_positions": variable_pos,
"nonref_positions": nonref_positions,
"constant_up_to_ambiguous": constant_up_to_ambiguous}