Source code for ivy.chars.recon

#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, unicode_literals

import math

import numpy as np

import ivy
from ivy.chars.expokit import cyexpokit
from ivy.chars import catpars

"""
Ancestor state reconstruction
"""

[docs]def anc_recon_cat(tree, chars, Q, p=None, pi="Equal", ars=None, nregime=1): """ Given tree, character states at tips, and transition matrix perform ancestor reconstruction for a discrete character. Perform downpass using mk function, then perform uppass. Return reconstructed states - including tips Args: tree (Node): Root node of a tree. All branch lengths must be greater than 0 (except root) chars (dict): Dict mapping character states to tip labels. Character states should be coded 0,1,2... Can also be a list with tip states in preorder sequence Q (np.array): Instantaneous rate matrix p (np.array): 3-D array of dimensions branch_number * nchar * nchar. Optional. Pre-allocated space for efficient calculations pi (str or np.array): Option to weight the root node by given values. Either a string containing the method or an array of weights. Weights should be given in order. Accepted methods of weighting root: Equal: Flat prior Equilibrium: Prior equal to stationary distribution of Q matrix Fitzjohn: Root states weighted by how well they explain the data at the tips. ars (dict): Dict of pre-allocated arrays to improve speed by avoiding creating and destroying new arrays. Can be created with create_ancrecon_ars function. Optional. nregime (int): If hidden-rates model is used, the number of regimes to consider in the reconstruction. Optional, defaults to 1 (no hidden rates) Returns: np.array: Array of nodes in preorder sequence containing marginal likelihoods. """ if type(chars) == dict: chars = [chars[l] for l in [n.label for n in tree.leaves()]] nchar = Q.shape[0] if ars is None: # Creating arrays to be used later ars = create_ancrecon_ars(tree, chars, nregime) # Calculating the likelihoods for each node in post-order sequence if p is None: # Instantiating empty array p = np.empty([len(ars["t"]), Q.shape[0], Q.shape[1]], dtype = np.double, order="C") # Creating probability matrices from Q matrix and branch lengths cyexpokit.dexpm_tree_preallocated_p(Q, ars["t"], p) # This changes p in place np.copyto(ars["down_nl_w"], ars["down_nl_r"]) # Copy original values if they have been changed ars["child_inds"].fill(0) root_equil = ivy.chars.mk.qsd(Q) cyexpokit.cy_anc_recon(p, ars["down_nl_w"], ars["charlist"], ars["childlist"], ars["up_nl"], ars["marginal_nl"], ars["partial_parent_nl"], ars["partial_nl"], ars["child_inds"], root_equil,ars["temp_dotprod"], nregime) return ars["marginal_nl"]
[docs]def anc_recon_mkmr(root,chars,Q,switchpoint): nregime = Q.shape[0] root_copy = root.copy() for node in root_copy.descendants(): node.meta["cached"]=False node.bisect_branch(1e-15,reindex=False) root_copy.reindex() root_copy.set_iternode_cache() for node in root_copy.iternodes(): node.meta["cached"] = True nchar = len(set(chars)) ars = create_ancrecon_ars(tree, chars, nregime)
[docs]def create_ancrecon_ars(tree, chars, nregime = 1): """ Create nodelists. For use in recon function Returns edgelist of nodes in postorder sequence, edgelist of nodes in preorder sequence, partial likelihood vector for each node's children, childlist, and branch lengths. """ if type(chars) == dict: chars = [chars[l] for l in [n.label for n in tree.leaves()]] t = np.array([node.length for node in tree.postiter() if not node.isroot], dtype=np.double) nobschar = len(set(chars)) nchar = nobschar * nregime preleaves = [ n for n in tree.preiter() if n.isleaf ] postleaves = [n for n in tree.postiter() if n.isleaf ] postnodes = list(tree.postiter()) prenodes = list(tree.preiter()) postChars = [ chars[i] for i in [ preleaves.index(n) for n in postleaves ] ] nnode = len(t)+1 down_nl = np.zeros([nnode, (nchar)+1]) up_nl = np.zeros([nnode, (nchar)+2]) childlist = np.zeros(nnode, dtype=object) leafind = [ n.isleaf for n in tree.postiter()] # --------- Down nodelist(nl), postorder sequence for k,ch in enumerate(postChars): # Setting initial tip likelihoods to 1 or 0 # TODO: Switch to log calculations for r in range(nregime): [ n for i,n in enumerate(down_nl) if leafind[i] ][k][ch+(r*nobschar)] = 1.0 for i,n in enumerate(down_nl[:-1]): n[nchar] = postnodes.index(postnodes[i].parent) childlist[i] = [ nod.pi for nod in postnodes[i].children ] childlist[i+1] = [ nod.pi for nod in postnodes[i+1].children ] # Setting initial node likelihoods to one for calculations down_nl[[ i for i,b in enumerate(leafind) if not b],:-1] = 1.0 # -------- Up nl, preorder sequence up_nl[:,:-1].fill(1) for i,n in enumerate(up_nl[1:]): n[nchar] = postnodes.index(prenodes[1:][i].parent) n[nchar+1] = postnodes.index(prenodes[1:][i]) # -------- Marginal nl, also preorder sequence marginal_nl = up_nl.copy() # ------- Partial likelihood vectors partial_nl = np.zeros([nnode,max([n.nchildren for n in tree]),nchar]) # ------- Parent partial likelihood vectors partial_parent_nl = np.zeros([up_nl.shape[0],nchar]) # ------- Child masking arrays child_inds = np.zeros(partial_nl.shape[0], dtype=int) # ------- Character list charlist = list(range(nchar)) # ------- Empty array to store root priors root_priors = np.empty([nchar], dtype=np.double) # ------- Temporary array to store dot products temp_dotprod = np.empty([nchar], dtype=np.double) names = ["down_nl_r","down_nl_w","up_nl","marginal_nl", "partial_nl","partial_parent_nl","child_inds", "childlist","t","charlist","root_priors","temp_dotprod"] ar_list = [down_nl,down_nl.copy(),up_nl,marginal_nl,partial_nl,partial_parent_nl,child_inds, childlist,t,charlist,root_priors,temp_dotprod] ar_dict = {name:ar for name,ar in zip(names, ar_list)} return ar_dict
[docs]def parsimony_recon(tree, chars): """ Use parsimony to reconstruct the minimum number of changes needed to observe states at the tips. Equal weights assumed Args: tree (Node): Root node of tree chars (dict): Dict mapping character states to tip labels. Character states should be coded 0,1,2... Can also be a list with tip states in preorder sequence Returns: np.array: Array of assigned states of interior nodes """ stepmatrix = catpars.default_costmatrix(len(set(chars.values()))) rec = catpars.ancstates(tree, chars, stepmatrix) for k in list(rec.keys()): rec[k] = rec[k][0] return rec
[docs]def pscore(tree, chars): """ Return the minimum number of changes needed by parsimony to observe given data on the tree Args: tree (Node): Root node of the tree chars (dict): Dict mapping character states to tip labels. Character states should be coded 0,1,2... """ if len(set(chars)) == 1: return 0 precon = parsimony_recon(tree, chars) minp = 0.0 chars = [chars[l] for l in [n.label for n in tree.leaves()]] for node in tree: if not node.isleaf: ch = get_childstates(node, precon, chars) pa = precon[node] minp += average_nchanges(ch, pa) return minp
[docs]def get_childstates(node, precon, chars): """ For use in pscore function """ chstates = [None] * len(node.children) for i,n in enumerate(node.children): if n.isleaf: chstates[i] = chars[n.li] else: chstates[i] = precon[n] if not hasattr(chstates[i], "__iter__"): chstates[i] = [chstates[i]] return chstates
[docs]def average_nchanges(ch, pa): """ For use in pscore function """ lp = 1.0/len(pa) nchanges = 0 for p in pa: for c in ch: lc = 1.0/len(c) for s in c: if s != p: nchanges += lp * lc return nchanges