msaexplorer.explore
Explore module
This module contains the two classes MSA and Annotation which are used to read in the respective files
and can be used to compute several statistics or be used as the input for the draw module functions.
Classes
1""" 2# Explore module 3 4This module contains the two classes `MSA` and `Annotation` which are used to read in the respective files 5and can be used to compute several statistics or be used as the input for the `draw` module functions. 6 7## Classes 8 9""" 10 11# built-in 12import os 13import io 14import math 15import collections 16import re 17from typing import Callable, Dict 18 19# installed 20import numpy as np 21from numpy import ndarray 22 23# msaexplorer 24from msaexplorer import config 25 26 27def _get_line_iterator(source): 28 """ 29 allow reading in both raw string or paths 30 """ 31 if isinstance(source, str) and os.path.exists(source): 32 return open(source, 'r') 33 else: 34 return io.StringIO(source) 35 36class MSA: 37 """ 38 An alignment class that allows computation of several stats 39 """ 40 41 def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None): 42 """ 43 Initialise an Alignment object. 44 :param alignment_string: Path to alignment file or raw alignment string 45 :param reference_id: reference id 46 :param zoom_range: start and stop positions to zoom into the alignment 47 """ 48 self._alignment = self._read_alignment(alignment_string) 49 self._reference_id = self._validate_ref(reference_id, self._alignment) 50 self._zoom = self._validate_zoom(zoom_range, self._alignment) 51 self._aln_type = self._determine_aln_type(self._alignment) 52 53 # TODO: read in different alignment types 54 # Static methods 55 @staticmethod 56 def _read_alignment(file_path: str) -> dict: 57 """ 58 Parse MSA alignment file. 59 :param file_path: path to alignment file 60 :return: dictionary with ids as keys and sequences as values 61 """ 62 63 def add_seq(aln: dict, sequence_id: str, seq_list: list): 64 """ 65 Add a complete sequence and check for non-allowed chars 66 :param aln: alignment dictionary to build 67 :param sequence_id: sequence id to add 68 :param seq_list: sequences to add 69 :return: alignment with added sequences 70 """ 71 final_seq = ''.join(seq_list).upper() 72 # Check for non-allowed characters 73 invalid_chars = set(final_seq) - set(config.POSSIBLE_CHARS) 74 if invalid_chars: 75 raise ValueError( 76 f"{sequence_id} contains invalid characters: {', '.join(invalid_chars)}. Allowed chars are: {config.POSSIBLE_CHARS}" 77 ) 78 aln[sequence_id] = final_seq 79 80 return aln 81 82 alignment, seq_lines = {}, [] 83 seq_id = None 84 85 with _get_line_iterator(file_path) as file: 86 for i, line in enumerate(file): 87 line = line.strip() 88 # initial check for fasta format 89 if i == 0 and not line.startswith(">"): 90 raise ValueError('Alignment has to be in fasta format starting with >SeqID.') 91 if line.startswith(">"): 92 if seq_id: 93 alignment = add_seq(alignment, seq_id, seq_lines) 94 # initialize a new sequence 95 seq_id, seq_lines = line[1:], [] 96 else: 97 seq_lines.append(line) 98 # handle last sequence 99 if seq_id: 100 alignment = add_seq(alignment, seq_id, seq_lines) 101 # final sanity checks 102 if alignment: 103 # alignment contains only one sequence: 104 if len(alignment) < 2: 105 raise ValueError("Alignment must contain more than one sequence.") 106 # alignment sequences are not same length 107 first_seq_len = len(next(iter(alignment.values()))) 108 for sequence_id, sequence in alignment.items(): 109 if len(sequence) != first_seq_len: 110 raise ValueError( 111 f"All alignment sequences must have the same length. Sequence '{sequence_id}' has length {len(sequence)}, expected {first_seq_len}." 112 ) 113 # all checks passed 114 return alignment 115 else: 116 raise ValueError(f"Alignment file {file_path} does not contain any sequences in fasta format.") 117 118 @staticmethod 119 def _validate_ref(reference: str | None, alignment: dict) -> str | None | ValueError: 120 """ 121 Validate if the ref seq is indeed part of the alignment. 122 :param reference: reference seq id 123 :param alignment: alignment dict 124 :return: validated reference 125 """ 126 if reference in alignment.keys(): 127 return reference 128 elif reference is None: 129 return reference 130 else: 131 raise ValueError('Reference not in alignment.') 132 133 @staticmethod 134 def _validate_zoom(zoom: tuple | int, original_aln: dict) -> ValueError | tuple | None: 135 """ 136 Validates if the user defined zoom range is within the start, end of the initial 137 alignment.\n 138 :param zoom: zoom range or zoom start 139 :param original_aln: non-zoomed alignment dict 140 :return: validated zoom range 141 """ 142 if zoom is not None: 143 aln_length = len(original_aln[list(original_aln.keys())[0]]) 144 # check if only over value is provided -> stop is alignment length 145 if isinstance(zoom, int): 146 if 0 <= zoom < aln_length: 147 return zoom, aln_length - 1 148 else: 149 raise ValueError('Zoom start must be within the alignment length range.') 150 # check if more than 2 values are provided 151 if len(zoom) != 2: 152 raise ValueError('Zoom position have to be (zoom_start, zoom_end)') 153 # validate zoom start/stop 154 for position in zoom: 155 if type(position) != int: 156 raise ValueError('Zoom positions have to be integers.') 157 if position not in range(0, aln_length): 158 raise ValueError('Zoom position out of range') 159 160 return zoom 161 162 @staticmethod 163 def _determine_aln_type(alignment) -> str: 164 """ 165 Determine the most likely type of alignment 166 if 70% of chars in the alignment are nucleotide 167 chars it is most likely a nt alignment 168 :return: type of alignment 169 """ 170 counter = int() 171 for record in alignment: 172 if 'U' in alignment[record]: 173 return 'RNA' 174 counter += sum(map(alignment[record].count, ['A', 'C', 'G', 'T', 'N', '-'])) 175 # determine which is the most likely type 176 if counter / len(alignment) >= 0.7 * len(alignment[list(alignment.keys())[0]]): 177 return 'DNA' 178 else: 179 return 'AA' 180 181 # Properties with setters 182 @property 183 def reference_id(self): 184 return self._reference_id 185 186 @reference_id.setter 187 def reference_id(self, ref_id: str): 188 """ 189 Set and validate the reference id. 190 """ 191 self._reference_id = self._validate_ref(ref_id, self.alignment) 192 193 @property 194 def zoom(self) -> tuple: 195 return self._zoom 196 197 @zoom.setter 198 def zoom(self, zoom_pos: tuple | int): 199 """ 200 Validate if the user defined zoom range. 201 """ 202 self._zoom = self._validate_zoom(zoom_pos, self._alignment) 203 204 # Property without setters 205 @property 206 def aln_type(self) -> str: 207 """ 208 define the aln type: 209 RNA, DNA or AA 210 """ 211 return self._aln_type 212 213 # On the fly properties without setters 214 @property 215 def length(self) -> int: 216 return len(next(iter(self.alignment.values()))) 217 218 @property 219 def alignment(self) -> dict: 220 """ 221 (zoomed) version of the alignment. 222 """ 223 if self.zoom is not None: 224 zoomed_aln = dict() 225 for seq in self._alignment: 226 zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]] 227 return zoomed_aln 228 else: 229 return self._alignment 230 231 # functions for different alignment stats 232 def get_reference_coords(self) -> tuple[int, int]: 233 """ 234 Determine the start and end coordinates of the reference sequence 235 defined as the first/last nucleotide in the reference sequence 236 (excluding N and gaps). 237 238 :return: start, end 239 """ 240 start, end = 0, self.length 241 242 if self.reference_id is None: 243 return start, end 244 else: 245 # 5' --> 3' 246 for start in range(self.length): 247 if self.alignment[self.reference_id][start] not in ['-', 'N']: 248 break 249 # 3' --> 5' 250 for end in range(self.length - 1, 0, -1): 251 if self.alignment[self.reference_id][end] not in ['-', 'N']: 252 break 253 254 return start, end 255 256 def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str: 257 """ 258 Creates a non-gapped consensus sequence. 259 260 :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes 261 the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments) 262 or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold. 263 :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position 264 has a frequency above the defined threshold. 265 :return: consensus sequence 266 """ 267 268 # helper functions 269 def determine_counts(alignment_dict: dict, position: int) -> dict: 270 """ 271 count the number of each char at 272 an idx of the alignment. return sorted dic. 273 handles ambiguous nucleotides in sequences. 274 also handles gaps. 275 """ 276 nucleotide_list = [] 277 278 # get all nucleotides 279 for sequence in alignment_dict.items(): 280 nucleotide_list.append(sequence[1][position]) 281 # count occurences of nucleotides 282 counter = dict(collections.Counter(nucleotide_list)) 283 # get permutations of an ambiguous nucleotide 284 to_delete = [] 285 temp_dict = {} 286 for nucleotide in counter: 287 if nucleotide in config.AMBIG_CHARS[self.aln_type]: 288 to_delete.append(nucleotide) 289 permutations = config.AMBIG_CHARS[self.aln_type][nucleotide] 290 adjusted_freq = 1 / len(permutations) 291 for permutation in permutations: 292 if permutation in temp_dict: 293 temp_dict[permutation] += adjusted_freq 294 else: 295 temp_dict[permutation] = adjusted_freq 296 297 # drop ambiguous entries and add adjusted freqs to 298 if to_delete: 299 for i in to_delete: 300 counter.pop(i) 301 for nucleotide in temp_dict: 302 if nucleotide in counter: 303 counter[nucleotide] += temp_dict[nucleotide] 304 else: 305 counter[nucleotide] = temp_dict[nucleotide] 306 307 return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True)) 308 309 def get_consensus_char(counts: dict, cutoff: float) -> list: 310 """ 311 get a list of nucleotides for the consensus seq 312 """ 313 n = 0 314 315 consensus_chars = [] 316 for char in counts: 317 n += counts[char] 318 consensus_chars.append(char) 319 if n >= cutoff: 320 break 321 322 return consensus_chars 323 324 def get_ambiguous_char(nucleotides: list) -> str: 325 """ 326 get ambiguous char from a list of nucleotides 327 """ 328 for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items(): 329 if set(permutations) == set(nucleotides): 330 break 331 332 return ambiguous 333 334 # check if params have been set correctly 335 if threshold is not None: 336 if threshold < 0 or threshold > 1: 337 raise ValueError('Threshold must be between 0 and 1.') 338 if self.aln_type == 'AA' and use_ambig_nt: 339 raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.') 340 if threshold is None and use_ambig_nt: 341 raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.') 342 343 alignment = self.alignment 344 consensus = str() 345 346 if threshold is not None: 347 consensus_cutoff = len(alignment) * threshold 348 else: 349 consensus_cutoff = 0 350 351 # built consensus sequences 352 for idx in range(self.length): 353 char_counts = determine_counts(alignment, idx) 354 consensus_chars = get_consensus_char( 355 char_counts, 356 consensus_cutoff 357 ) 358 if threshold != 0: 359 if len(consensus_chars) > 1: 360 if use_ambig_nt: 361 char = get_ambiguous_char(consensus_chars) 362 else: 363 if self.aln_type == 'AA': 364 char = 'X' 365 else: 366 char = 'N' 367 consensus = consensus + char 368 else: 369 consensus = consensus + consensus_chars[0] 370 else: 371 consensus = consensus + consensus_chars[0] 372 373 return consensus 374 375 def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict: 376 """ 377 **conserved ORF definition:** 378 - conserved starts and stops 379 - start, stop must be on the same frame 380 - stop - start must be at least min_length 381 - all ungapped seqs[start:stop] must have at least min_length 382 - no ungapped seq can have a Stop in between Start Stop 383 384 Conservation is measured by number of positions with identical characters divided by 385 orf slice of the alignment. 386 387 **Algorithm overview:** 388 - check for conserved start and stop codons 389 - iterate over all three frames 390 - check each start and next sufficiently far away stop codon 391 - check if all ungapped seqs between start and stop codon are >= min_length 392 - check if no ungapped seq in the alignment has a stop codon 393 - write to dictionary 394 - classify as internal if the stop codon has already been written with a prior start 395 - repeat for reverse complement 396 397 :return: ORF positions and internal ORF positions 398 """ 399 400 # helper functions 401 def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple: 402 """ 403 Determine all start and stop codons within an alignment. 404 :param alignment: alignment 405 :param alignment_length: length of alignment 406 :return: start and stop codon positions 407 """ 408 starts = config.START_CODONS[self.aln_type] 409 stops = config.STOP_CODONS[self.aln_type] 410 411 list_of_starts, list_of_stops = [], [] 412 ref = alignment[list(alignment.keys())[0]] 413 for nt_position in range(alignment_length): 414 if ref[nt_position:nt_position + 3] in starts: 415 conserved_start = True 416 for sequence in alignment: 417 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts: 418 conserved_start = False 419 break 420 if conserved_start: 421 list_of_starts.append(nt_position) 422 423 if ref[nt_position:nt_position + 3] in stops: 424 conserved_stop = True 425 for sequence in alignment: 426 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops: 427 conserved_stop = False 428 break 429 if conserved_stop: 430 list_of_stops.append(nt_position) 431 432 return list_of_starts, list_of_stops 433 434 def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list: 435 """ 436 get ungapped sequences starting and stop codons and eliminate gaps 437 :param alignment: alignment 438 :param start_pos: start codon 439 :param stop_pos: stop codon 440 :return: sliced sequences 441 """ 442 ungapped_seqs = [] 443 for seq_id in alignment: 444 ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', '')) 445 446 return ungapped_seqs 447 448 def additional_stops(ungapped_seqs: list) -> bool: 449 """ 450 Checks for the presence of a stop codon 451 :param ungapped_seqs: list of ungapped sequences 452 :return: Additional stop codons (True/False) 453 """ 454 stops = config.STOP_CODONS[self.aln_type] 455 456 for sliced_seq in ungapped_seqs: 457 for position in range(0, len(sliced_seq) - 3, 3): 458 if sliced_seq[position:position + 3] in stops: 459 return True 460 return False 461 462 def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float: 463 sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1 # identical = 0, different = -1 --> add 1 464 return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100 465 466 # checks for arguments 467 if self.aln_type == 'AA': 468 raise TypeError('ORF search only for RNA/DNA alignments') 469 470 if identity_cutoff is not None: 471 if identity_cutoff > 100 or identity_cutoff < 0: 472 raise ValueError('conservation cutoff must be between 0 and 100') 473 474 if min_length <= 6 or min_length > self.length: 475 raise ValueError(f'min_length must be between 6 and {self.length}') 476 477 # ini 478 identities = self.calc_identity_alignment() 479 alignments = [self.alignment, self.calc_reverse_complement_alignment()] 480 aln_len = self.length 481 482 orf_counter = 0 483 orf_dict = {} 484 485 for aln, direction in zip(alignments, ['+', '-']): 486 # check for starts and stops in the first seq and then check if these are present in all seqs 487 conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len) 488 # check each frame 489 for frame in (0, 1, 2): 490 potential_starts = [x for x in conserved_starts if x % 3 == frame] 491 potential_stops = [x for x in conserved_stops if x % 3 == frame] 492 last_stop = -1 493 for start in potential_starts: 494 # go to the next stop that is sufficiently far away in the alignment 495 next_stops = [x for x in potential_stops if x + 3 >= start + min_length] 496 if not next_stops: 497 continue 498 next_stop = next_stops[0] 499 ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop) 500 # re-check the lengths of all ungapped seqs 501 ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs] 502 if not all(ungapped_seq_lengths): 503 continue 504 # if no stop codon between start and stop --> write to dictionary 505 if not additional_stops(ungapped_sliced_seqs): 506 if direction == '+': 507 positions = [start, next_stop + 3] 508 else: 509 positions = [aln_len - next_stop - 3, aln_len - start] 510 if last_stop != next_stop: 511 last_stop = next_stop 512 conservation = calculate_identity(identities, positions) 513 if identity_cutoff is not None and conservation < identity_cutoff: 514 continue 515 orf_dict[f'ORF_{orf_counter}'] = {'location': [positions], 516 'frame': frame, 517 'strand': direction, 518 'conservation': conservation, 519 'internal': [] 520 } 521 orf_counter += 1 522 else: 523 if orf_dict: 524 orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions) 525 526 return orf_dict 527 528 def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict: 529 """ 530 First calculates all ORFs and then searches from 5' 531 all non-overlapping orfs in the fw strand and from the 532 3' all non-overlapping orfs in th rw strand. 533 534 **No overlap algorithm:** 535 **frame 1:** -[M------*]--- ----[M--*]---------[M----- 536 537 **frame 2:** -------[M------*]---------[M---*]-------- 538 539 **frame 3:** [M---*]-----[M----------*]----------[M--- 540 541 **results:** [M---*][M------*]--[M--*]-[M---*]-[M----- 542 543 frame: 3 2 1 2 1 544 545 :return: dictionary with non-overlapping orfs 546 """ 547 orf_dict = self.get_conserved_orfs(min_length, identity_cutoff) 548 549 fw_orfs, rw_orfs = [], [] 550 551 for orf in orf_dict: 552 if orf_dict[orf]['strand'] == '+': 553 fw_orfs.append((orf, orf_dict[orf]['location'][0])) 554 else: 555 rw_orfs.append((orf, orf_dict[orf]['location'][0])) 556 557 fw_orfs.sort(key=lambda x: x[1][0]) # sort by start pos 558 rw_orfs.sort(key=lambda x: x[1][1], reverse=True) # sort by stop pos 559 non_overlapping_orfs = [] 560 for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']): 561 previous_stop = -1 if strand == '+' else self.length + 1 562 for orf in orf_list: 563 if strand == '+' and orf[1][0] > previous_stop: 564 non_overlapping_orfs.append(orf[0]) 565 previous_stop = orf[1][1] 566 elif strand == '-' and orf[1][1] < previous_stop: 567 non_overlapping_orfs.append(orf[0]) 568 previous_stop = orf[1][0] 569 570 non_overlap_dict = {} 571 for orf in orf_dict: 572 if orf in non_overlapping_orfs: 573 non_overlap_dict[orf] = orf_dict[orf] 574 575 return non_overlap_dict 576 577 def calc_length_stats(self) -> dict: 578 """ 579 Determine the stats for the length of the ungapped seqs in the alignment. 580 :return: dictionary with length stats 581 """ 582 583 seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment] 584 585 return {'number of seq': len(self.alignment), 586 'mean length': float(np.mean(seq_lengths)), 587 'std length': float(np.std(seq_lengths)), 588 'min length': int(np.min(seq_lengths)), 589 'max length': int(np.max(seq_lengths)) 590 } 591 592 def calc_entropy(self) -> list: 593 """ 594 Calculate the normalized shannon's entropy for every position in an alignment: 595 596 - 1: high entropy 597 - 0: low entropy 598 599 :return: Entropies at each position. 600 """ 601 602 # helper functions 603 def shannons_entropy(character_list: list, states: int, aln_type: str) -> float: 604 """ 605 Calculate the shannon's entropy of a sequence and 606 normalized between 0 and 1. 607 :param character_list: characters at an alignment position 608 :param states: number of potential characters that can be present 609 :param aln_type: type of the alignment 610 :returns: entropy 611 """ 612 ent, n_chars = 0, len(character_list) 613 # only one char is in the list 614 if n_chars <= 1: 615 return ent 616 # calculate the number of unique chars and their counts 617 chars, char_counts = np.unique(character_list, return_counts=True) 618 char_counts = char_counts.astype(float) 619 # ignore gaps for entropy calc 620 char_counts, chars = char_counts[chars != "-"], chars[chars != "-"] 621 # correctly handle ambiguous chars 622 index_to_drop = [] 623 for index, char in enumerate(chars): 624 if char in config.AMBIG_CHARS[aln_type]: 625 index_to_drop.append(index) 626 amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True) 627 amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char]) 628 # add the proportionate numbers to initial array 629 for amb_char, amb_count in zip(amb_chars, amb_counts): 630 if amb_char in chars: 631 char_counts[chars == amb_char] += amb_count 632 else: 633 chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count) 634 # drop the ambiguous characters from array 635 char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop) 636 # calc the entropy 637 probs = char_counts / n_chars 638 if np.count_nonzero(probs) <= 1: 639 return ent 640 for prob in probs: 641 ent -= prob * math.log(prob, states) 642 643 return ent 644 645 aln = self.alignment 646 entropys = [] 647 648 if self.aln_type == 'AA': 649 states = 20 650 else: 651 states = 4 652 # iterate over alignment positions and the sequences 653 for nuc_pos in range(self.length): 654 pos = [] 655 for record in aln: 656 pos.append(aln[record][nuc_pos]) 657 entropys.append(shannons_entropy(pos, states, self.aln_type)) 658 659 return entropys 660 661 def calc_gc(self) -> list | TypeError: 662 """ 663 Determine the GC content for every position in an nt alignment. 664 :return: GC content for every position. 665 :raises: TypeError for AA alignments 666 """ 667 if self.aln_type == 'AA': 668 raise TypeError("GC computation is not possible for aminoacid alignment") 669 670 gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type] 671 672 for position in range(self.length): 673 nucleotides = str() 674 for record in aln: 675 nucleotides = nucleotides + aln[record][position] 676 # ini dict with chars that occur and which ones to 677 # count in which freq 678 to_count = { 679 'G': 1, 680 'C': 1, 681 } 682 # handle ambig. nuc 683 for char in amb_nucs: 684 if char in nucleotides: 685 to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char]) 686 687 gc.append( 688 sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides) 689 ) 690 691 return gc 692 693 def calc_coverage(self) -> list: 694 """ 695 Determine the coverage of every position in an alignment. 696 This is defined as: 697 1 - cumulative length of '-' characters 698 699 :return: Coverage at each alignment position. 700 """ 701 coverage, aln = [], self.alignment 702 703 for nuc_pos in range(self.length): 704 pos = str() 705 for record in aln.keys(): 706 pos = pos + aln[record][nuc_pos] 707 coverage.append(1 - pos.count('-') / len(pos)) 708 709 return coverage 710 711 def calc_reverse_complement_alignment(self) -> dict | TypeError: 712 """ 713 Reverse complement the alignment. 714 :return: Alignment (rv) 715 """ 716 if self.aln_type == 'AA': 717 raise TypeError('Reverse complement only for RNA or DNA.') 718 719 aln = self.alignment 720 reverse_complement_dict = {} 721 722 for seq_id in aln: 723 reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id])) 724 725 return reverse_complement_dict 726 727 def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False): 728 """ 729 Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the 730 remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard']. 731 732 :param encode_ambiguities: encode ambiguities as -2 733 :param encode_mask: encode mask with as -3 734 :returns matrix 735 """ 736 737 aln = self.alignment 738 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 739 # ini matrix 740 numerical_matrix = np.full(sequences.shape, np.nan, dtype=float) 741 # first encode mask 742 if encode_mask: 743 if self.aln_type == 'AA': 744 is_n_or_x = np.isin(sequences, ['X']) 745 else: 746 is_n_or_x = np.isin(sequences, ['N']) 747 numerical_matrix[is_n_or_x] = -2 748 # next encode ambig chars 749 if encode_ambiguities: 750 numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3 751 # next convert each char into their respective values 752 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 753 numerical_matrix[np.isin(sequences, [char])] = idx + 1 754 755 return numerical_matrix 756 757 def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray: 758 """ 759 Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n 760 761 :param encode_mismatches: encode mismatch as -1 762 :param encode_mask: encode mask with value=-2 --> also in the reference 763 :param encode_gaps: encode gaps with np.nan --> also in the reference 764 :param encode_ambiguities: encode ambiguities with value=-3 765 :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard'] 766 :return: identity alignment 767 """ 768 769 aln = self.alignment 770 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 771 772 # convert alignment to array 773 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 774 reference = np.array(list(ref)) 775 # ini matrix 776 identity_matrix = np.full(sequences.shape, 0, dtype=float) 777 778 is_identical = sequences == reference 779 780 if encode_gaps: 781 is_gap = sequences == '-' 782 else: 783 is_gap = np.full(sequences.shape, False) 784 785 if encode_mask: 786 if self.aln_type == 'AA': 787 is_n_or_x = np.isin(sequences, ['X']) 788 else: 789 is_n_or_x = np.isin(sequences, ['N']) 790 else: 791 is_n_or_x = np.full(sequences.shape, False) 792 793 if encode_ambiguities: 794 is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']]) 795 else: 796 is_ambig = np.full(sequences.shape, False) 797 798 if encode_mismatches: 799 is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig 800 else: 801 is_mismatch = np.full(sequences.shape, False) 802 803 # encode every different character 804 if encode_each_mismatch_char: 805 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 806 new_encoding = np.isin(sequences, [char]) & is_mismatch 807 identity_matrix[new_encoding] = idx + 1 808 # or encode different with a single value 809 else: 810 identity_matrix[is_mismatch] = -1 # mismatch 811 812 identity_matrix[is_gap] = np.nan # gap 813 identity_matrix[is_n_or_x] = -2 # 'N' or 'X' 814 identity_matrix[is_ambig] = -3 # ambiguities 815 816 return identity_matrix 817 818 def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray: 819 """ 820 Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight 821 differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the 822 reference residue at each column. Gaps are encoded as np.nan. 823 824 The calculation follows these steps: 825 826 1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise, 827 a consensus sequence is generated to serve as the reference. 828 2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as 829 BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type. 830 3. **Per-Column Normalization (optional)**: 831 832 For each column in the alignment: 833 - The residue in the reference sequence is treated as the baseline for that column. 834 - The substitution scores for the reference residue are extracted from the substitution matrix. 835 - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue. 836 - This ensures that identical residues (or those with high similarity to the reference) have high scores, 837 while more dissimilar residues have lower scores. 838 4. **Output**: 839 840 - The normalized similarity scores are stored in a NumPy array. 841 - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`. 842 843 :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN) 844 :param: normalize: whether to normalize the similarity scores to range [0, 1] 845 :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue 846 and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity). 847 Gaps and invalid residues are encoded as `np.nan`. 848 :raise: ValueError 849 If the specified substitution matrix is not available for the given alignment type. 850 """ 851 852 aln = self.alignment 853 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 854 if matrix_type is None: 855 if self.aln_type == 'AA': 856 matrix_type = 'BLOSUM65' 857 else: 858 matrix_type = 'TRANS' 859 # load substitution matrix as dictionary 860 try: 861 subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type] 862 except KeyError: 863 raise ValueError( 864 f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}' 865 ) 866 867 # set dtype and convert alignment to a NumPy array for vectorized processing 868 dtype = np.dtype(float, metadata={'matrix': matrix_type}) 869 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 870 reference = np.array(list(ref)) 871 valid_chars = list(subs_matrix.keys()) 872 similarity_array = np.full(sequences.shape, np.nan, dtype=dtype) 873 874 for j, ref_char in enumerate(reference): 875 if ref_char not in valid_chars + ['-']: 876 continue 877 # Get local min and max for the reference residue 878 if normalize and ref_char != '-': 879 local_scores = subs_matrix[ref_char].values() 880 local_min, local_max = min(local_scores), max(local_scores) 881 882 for i, char in enumerate(sequences[:, j]): 883 if char not in valid_chars: 884 continue 885 # classify the similarity as max if the reference has a gap 886 similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1 887 similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score 888 889 return similarity_array 890 891 def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError: 892 """ 893 Calculates a position matrix of the specified type for the given alignment. The function 894 supports generating matrices of types Position Frequency Matrix (PFM), Position Probability 895 Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates 896 the provided matrix type and includes pseudo-count adjustments to ensure robust calculations. 897 898 :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM', 899 'PWM', and 'IC'. Defaults to 'PWM'. 900 :type matrix_type: str 901 :raises ValueError: If the provided `matrix_type` is not one of the accepted values. 902 :return: A numpy array representing the calculated position matrix of the specified type. 903 :rtype: np.ndarray 904 """ 905 906 # ini 907 aln = self.alignment 908 if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']: 909 raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.') 910 possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys()) 911 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 912 913 # calc position frequency matrix 914 pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars]) 915 if matrix_type == 'PFM': 916 return pfm 917 918 # calc position probability matrix (probability) 919 pseudo_count = 0.0001 # to avoid 0 values 920 pfm = pfm + pseudo_count 921 ppm_non_char_excluded = pfm/np.sum(pfm, axis=0) # use this for pwm/ic calculation 922 ppm = pfm/len(aln.keys()) # calculate the frequency based on row number 923 if matrix_type == 'PPM': 924 return ppm 925 926 # calc position weight matrix (log-likelihood) 927 pwm = np.log2(ppm_non_char_excluded * len(possible_chars)) 928 if matrix_type == 'PWM': 929 return pwm 930 931 # calc information content per position (in bits) - can be used to scale a ppm for sequence logos 932 ic = np.sum(ppm_non_char_excluded * pwm, axis=0) 933 if matrix_type == 'IC': 934 return ic 935 936 return None 937 938 def calc_percent_recovery(self) -> dict: 939 """ 940 Recovery per sequence either compared to the majority consensus seq 941 or the reference seq.\n 942 Defined as:\n 943 944 `(1 - sum(N/X/- characters in ungapped ref regions))*100` 945 946 This is highly similar to how nextclade calculates recovery over reference. 947 948 :return: dict 949 """ 950 951 aln = self.alignment 952 953 if self.reference_id is not None: 954 ref = aln[self.reference_id] 955 else: 956 ref = self.get_consensus() # majority consensus 957 958 if not any(char != '-' for char in ref): 959 raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.") 960 961 962 # count 'N', 'X' and '-' chars in non-gapped regions 963 recovery_over_ref = dict() 964 965 # Get positions of non-gap characters in the reference 966 non_gap_positions = [i for i, char in enumerate(ref) if char != '-'] 967 cumulative_length = len(non_gap_positions) 968 969 # Calculate recovery 970 for seq_id in aln: 971 if seq_id == self.reference_id: 972 continue 973 seq = aln[seq_id] 974 count_invalid = sum( 975 seq[pos] == '-' or 976 (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N') 977 for pos in non_gap_positions 978 ) 979 recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100 980 981 return recovery_over_ref 982 983 def calc_character_frequencies(self) -> dict: 984 """ 985 Calculate the percentage characters in the alignment: 986 The frequencies are counted by seq and in total. The 987 percentage of non-gap characters in the alignment is 988 relative to the total number of non-gap characters. 989 The gap percentage is relative to the sequence length. 990 991 The output is a nested dictionary. 992 993 :return: Character frequencies 994 """ 995 996 aln, aln_length = self.alignment, self.length 997 998 freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}} 999 1000 for seq_id in aln: 1001 freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0 1002 unique_chars = set(aln[seq_id]) 1003 for char in unique_chars: 1004 if char == '-': 1005 continue 1006 # add characters to dictionaries 1007 if char not in freqs[seq_id]: 1008 freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0} 1009 if char not in freqs['total']: 1010 freqs['total'][char] = {'counts': 0, '% of non-gapped': 0} 1011 # count non-gap chars 1012 freqs[seq_id][char]['counts'] += aln[seq_id].count(char) 1013 freqs['total'][char]['counts'] += freqs[seq_id][char]['counts'] 1014 all_chars += freqs[seq_id][char]['counts'] 1015 # normalize counts 1016 for char in freqs[seq_id]: 1017 if char == '-': 1018 continue 1019 freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100 1020 freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped'] 1021 # count gaps 1022 freqs[seq_id]['-']['counts'] = aln[seq_id].count('-') 1023 freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts'] 1024 # normalize gap counts 1025 freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100 1026 freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment'] 1027 1028 # normalize the total counts 1029 for char in freqs['total']: 1030 for value in freqs['total'][char]: 1031 if value == '% of alignment' or value == '% of non-gapped': 1032 freqs['total'][char][value] = freqs['total'][char][value] / len(aln) 1033 1034 return freqs 1035 1036 def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray: 1037 """ 1038 Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented: 1039 1040 **1) ghd (global hamming distance)**: At each alignment position, check if characters match: 1041 \ndistance = matches / alignment_length * 100 1042 1043 **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps: 1044 \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100 1045 1046 **3) ged (gap excluded distance)**: All gaps are excluded from the alignment 1047 \ndistance = matches / (matches + mismatches) * 100 1048 1049 **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch. 1050 \ndistance = matches / gap_compressed_alignment_length * 100 1051 1052 :return: array with pairwise distances. 1053 """ 1054 1055 def hamming_distance(seq1: str, seq2: str) -> int: 1056 return sum(c1 == c2 for c1, c2 in zip(seq1, seq2)) 1057 1058 def ghd(seq1: str, seq2: str) -> float: 1059 return hamming_distance(seq1, seq2) / self.length * 100 1060 1061 def lhd(seq1, seq2): 1062 # Trim gaps from both sides 1063 i, j = 0, self.length - 1 1064 while i < self.length and (seq1[i] == '-' or seq2[i] == '-'): 1065 i += 1 1066 while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'): 1067 j -= 1 1068 if i > j: 1069 return 0.0 1070 1071 seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1] 1072 matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_)) 1073 length = j - i + 1 1074 return (matches / length) * 100 if length > 0 else 0.0 1075 1076 def ged(seq1: str, seq2: str) -> float: 1077 1078 matches, mismatches = 0, 0 1079 1080 for c1, c2 in zip(seq1, seq2): 1081 if c1 != '-' and c2 != '-': 1082 if c1 == c2: 1083 matches += 1 1084 else: 1085 mismatches += 1 1086 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1087 1088 def gcd(seq1: str, seq2: str) -> float: 1089 matches = 0 1090 mismatches = 0 1091 in_gap = False 1092 1093 for char1, char2 in zip(seq1, seq2): 1094 if char1 == '-' and char2 == '-': # Shared gap: do nothing 1095 continue 1096 elif char1 == '-' or char2 == '-': # Gap in only one sequence 1097 if not in_gap: # Start of a new gap stretch 1098 mismatches += 1 1099 in_gap = True 1100 else: # No gaps 1101 in_gap = False 1102 if char1 == char2: # Matching characters 1103 matches += 1 1104 else: # Mismatched characters 1105 mismatches += 1 1106 1107 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1108 1109 1110 # Map distance type to corresponding function 1111 distance_functions: Dict[str, Callable[[str, str], float]] = { 1112 'ghd': ghd, 1113 'lhd': lhd, 1114 'ged': ged, 1115 'gcd': gcd 1116 } 1117 1118 if distance_type not in distance_functions: 1119 raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.") 1120 1121 # Compute pairwise distances 1122 aln = self.alignment 1123 distance_func = distance_functions[distance_type] 1124 distance_matrix = np.zeros((len(aln), len(aln))) 1125 1126 sequences = list(aln.values()) 1127 n = len(sequences) 1128 for i in range(n): 1129 seq1 = sequences[i] 1130 for j in range(i, n): 1131 seq2 = sequences[j] 1132 dist = distance_func(seq1, seq2) 1133 distance_matrix[i, j] = dist 1134 distance_matrix[j, i] = dist 1135 1136 return distance_matrix 1137 1138 def get_snps(self, include_ambig:bool=False) -> dict: 1139 """ 1140 Calculate snps similar to snp-sites (output is comparable): 1141 https://github.com/sanger-pathogens/snp-sites 1142 Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character. 1143 The SNPs are compared to a majority consensus sequence or to a reference if it has been set. 1144 1145 :param include_ambig: Include ambiguous snps (default: False) 1146 :return: dictionary containing snp positions and their variants including their frequency. 1147 """ 1148 aln = self.alignment 1149 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 1150 aln = {x: aln[x] for x in aln.keys() if x != self.reference_id} 1151 seq_ids = list(aln.keys()) 1152 snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}} 1153 1154 for pos in range(self.length): 1155 reference_char = ref[pos] 1156 if not include_ambig: 1157 if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-': 1158 continue 1159 alt_chars, snps = [], [] 1160 for i, seq_id in enumerate(aln.keys()): 1161 alt_chars.append(aln[seq_id][pos]) 1162 if reference_char != aln[seq_id][pos]: 1163 snps.append(i) 1164 if not snps: 1165 continue 1166 if include_ambig: 1167 if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps): 1168 continue 1169 else: 1170 snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]] 1171 if not snps: 1172 continue 1173 if pos not in snp_dict: 1174 snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}} 1175 for snp in snps: 1176 if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']: 1177 snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = { 1178 'AF': 1, 1179 'SEQ_ID': [seq_ids[snp]] 1180 } 1181 else: 1182 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1 1183 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp]) 1184 # calculate AF 1185 if pos in snp_dict['POS']: 1186 for alt in snp_dict['POS'][pos]['ALT']: 1187 snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln) 1188 1189 return snp_dict 1190 1191 def calc_transition_transversion_score(self) -> list: 1192 """ 1193 Based on the snp positions, calculates a transition/transversions score. 1194 A positive score means higher ratio of transitions and negative score means 1195 a higher ratio of transversions. 1196 :return: list 1197 """ 1198 1199 if self.aln_type == 'AA': 1200 raise TypeError('TS/TV scoring only for RNA/DNA alignments') 1201 1202 # ini 1203 snps = self.get_snps() 1204 score = [0]*self.length 1205 1206 for pos in snps['POS']: 1207 t_score_temp = 0 1208 for alt in snps['POS'][pos]['ALT']: 1209 # check the type of substitution 1210 if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']: 1211 score[pos] += snps['POS'][pos]['ALT'][alt]['AF'] 1212 else: 1213 score[pos] -= snps['POS'][pos]['ALT'][alt]['AF'] 1214 1215 return score 1216 1217 1218class Annotation: 1219 """ 1220 An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA. 1221 """ 1222 1223 def __init__(self, aln: MSA, annotation_path: str): 1224 """ 1225 The annotation class. Lets you parse multiple standard formats 1226 which might be used for annotating an alignment. The main purpose 1227 is to parse the annotation file and adapt the locations of diverse 1228 features to the locations within the alignment, considering the 1229 respective alignment positions. Importantly, IDs of the alignment 1230 and the MSA have to partly match. 1231 1232 :param aln: MSA class 1233 :param annotation_path: path to annotation file (gb, bed, gff) or raw string 1234 1235 """ 1236 1237 self.ann_type, self._seq_id, self.locus, self.features = self._parse_annotation(annotation_path, aln) # read annotation 1238 self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id) # extract gapped sequence 1239 self._position_map = self._build_position_map() # build a position map 1240 self._map_to_alignment() # adapt feature locations 1241 1242 @staticmethod 1243 def _MSA_validation_and_seq_extraction(aln: MSA, seq_id: str) -> str: 1244 """ 1245 extract gapped sequence from MSA that corresponds to annotation 1246 :param aln: MSA class 1247 :param seq_id: sequence id to extract 1248 :return: gapped sequence 1249 """ 1250 if not isinstance(aln, MSA): 1251 raise ValueError('alignment has to be an MSA class. use explore.MSA() to read in alignment') 1252 else: 1253 return aln._alignment[seq_id] 1254 1255 @staticmethod 1256 def _parse_annotation(annotation_path: str, aln: MSA) -> tuple[str, str, str, Dict]: 1257 1258 def detect_annotation_type(file_path: str) -> str: 1259 """ 1260 Detect the type of annotation file (GenBank, GFF, or BED) based 1261 on the first relevant line (excluding empty and #) 1262 1263 :param file_path: Path to the annotation file. 1264 :return: The detected file type ('gb', 'gff', or 'bed'). 1265 1266 :raises ValueError: If the file type cannot be determined. 1267 """ 1268 1269 with _get_line_iterator(file_path) as file: 1270 for line in file: 1271 # skip empty lines and comments 1272 if not line.strip() or line.startswith('#'): 1273 continue 1274 # genbank 1275 if line.startswith('LOCUS'): 1276 return 'gb' 1277 # gff 1278 if len(line.split('\t')) == 9: 1279 # Check for expected values 1280 columns = line.split('\t') 1281 if columns[6] in ['+', '-', '.'] and re.match(r'^\d+$', columns[3]) and re.match(r'^\d+$',columns[4]): 1282 return 'gff' 1283 # BED files are tab-delimited with at least 3 fields: chrom, start, end 1284 fields = line.split('\t') 1285 if len(fields) >= 3 and re.match(r'^\d+$', fields[1]) and re.match(r'^\d+$', fields[2]): 1286 return 'bed' 1287 # only read in the first line 1288 break 1289 1290 raise ValueError( 1291 "File type could not be determined. Ensure the file follows a recognized format (GenBank, GFF, or BED).") 1292 1293 def parse_gb(file_path) -> dict: 1294 """ 1295 parse a genebank file to dictionary - primarily retained are the informations 1296 for qualifiers as these will be used for plotting. 1297 1298 :param file_path: path to genebank file 1299 :return: nested dictionary 1300 1301 """ 1302 1303 def sanitize_gb_location(string: str) -> tuple[list, str]: 1304 """ 1305 see: https://www.insdc.org/submitting-standards/feature-table/ 1306 """ 1307 strand = '+' 1308 locations = [] 1309 # check the direction of the annotation 1310 if 'complement' in string: 1311 strand = '-' 1312 # sanitize operators 1313 for operator in ['complement(', 'join(', 'order(']: 1314 string = string.replace(operator, '') 1315 # sanitize possible chars for splitting start stop - 1316 # however in the future might not simply do this 1317 # as some useful information is retained 1318 for char in ['>', '<', ')']: 1319 string = string.replace(char, '') 1320 # check if we have multiple location e.g. due to splicing 1321 if ',' in string: 1322 raw_locations = string.split(',') 1323 else: 1324 raw_locations = [string] 1325 # try to split start and stop 1326 for location in raw_locations: 1327 for sep in ['..', '.', '^']: 1328 if sep in location: 1329 sanitized_locations = [int(x) for x in location.split(sep)] 1330 sanitized_locations[0] = sanitized_locations[0] - 1 # enforce 0-based starts 1331 locations.append(sanitized_locations) 1332 break 1333 1334 return locations, strand 1335 1336 1337 records = {} 1338 with _get_line_iterator(file_path) as file: 1339 record = None 1340 in_features = False 1341 counter_dict = {} 1342 for line in file: 1343 line = line.rstrip() 1344 parts = line.split() 1345 # extract the locus id 1346 if line.startswith('LOCUS'): 1347 if record: 1348 records[record['locus']] = record 1349 record = { 1350 'locus': parts[1], 1351 'features': {} 1352 } 1353 1354 elif line.startswith('FEATURES'): 1355 in_features = True 1356 1357 # ignore the sequence info 1358 elif line.startswith('ORIGIN'): 1359 in_features = False 1360 1361 # now write useful feature information to dictionary 1362 elif in_features: 1363 if not line.strip(): 1364 continue 1365 if line[5] != ' ': 1366 location_line = True # remember that we are in a location for multi-line locations 1367 feature_type, qualifier = parts[0], parts[1] 1368 if feature_type not in record['features']: 1369 record['features'][feature_type] = {} 1370 counter_dict[feature_type] = 0 1371 locations, strand = sanitize_gb_location(qualifier) 1372 record['features'][feature_type][counter_dict[feature_type]] = { 1373 'location': locations, 1374 'strand': strand 1375 } 1376 counter_dict[feature_type] += 1 1377 else: 1378 # edge case for multi-line locations 1379 if location_line and not line.strip().startswith('/'): 1380 locations, strand = sanitize_gb_location(parts[0]) 1381 for loc in locations: 1382 record['features'][feature_type][counter_dict[feature_type]]['location'].append(loc) 1383 else: 1384 location_line = False 1385 try: 1386 qualifier_type, qualifier = parts[0].split('=') 1387 except ValueError: # we are in the coding sequence 1388 qualifier = qualifier + parts[0] 1389 1390 qualifier_type, qualifier = qualifier_type.lstrip('/'), qualifier.strip('"') 1391 last_index = counter_dict[feature_type] - 1 1392 record['features'][feature_type][last_index][qualifier_type] = qualifier 1393 1394 records[record['locus']] = record 1395 1396 return records 1397 1398 def parse_gff(file_path) -> dict: 1399 """ 1400 Parse a GFF3 (General Feature Format) file into a dictionary structure. 1401 1402 :param file_path: path to genebank file 1403 :return: nested dictionary 1404 1405 """ 1406 records = {} 1407 with _get_line_iterator(file_path) as file: 1408 previous_id, previous_feature = None, None 1409 for line in file: 1410 if line.startswith('#') or not line.strip(): 1411 continue 1412 parts = line.strip().split('\t') 1413 seqid, source, feature_type, start, end, score, strand, phase, attributes = parts 1414 # ensure that region and source features are not named differently for gff and gb 1415 if feature_type == 'region': 1416 feature_type = 'source' 1417 if seqid not in records: 1418 records[seqid] = {'locus': seqid, 'features': {}} 1419 if feature_type not in records[seqid]['features']: 1420 records[seqid]['features'][feature_type] = {} 1421 1422 feature_id = len(records[seqid]['features'][feature_type]) 1423 feature = { 1424 'strand': strand, 1425 } 1426 1427 # Parse attributes into key-value pairs 1428 for attr in attributes.split(';'): 1429 if '=' in attr: 1430 key, value = attr.split('=', 1) 1431 feature[key.strip()] = value.strip() 1432 1433 # check if feature are the same --> possible splicing 1434 if previous_id is not None and previous_feature == feature: 1435 records[seqid]['features'][feature_type][previous_id]['location'].append([int(start)-1, int(end)]) 1436 else: 1437 records[seqid]['features'][feature_type][feature_id] = feature 1438 records[seqid]['features'][feature_type][feature_id]['location'] = [[int(start) - 1, int(end)]] 1439 # set new previous id and features -> new dict as 'location' is pointed in current feature and this 1440 # is the only key different if next feature has the same entries 1441 previous_id, previous_feature = feature_id, {key:value for key, value in feature.items() if key != 'location'} 1442 1443 return records 1444 1445 def parse_bed(file_path) -> dict: 1446 """ 1447 Parse a BED file into a dictionary structure. 1448 1449 :param file_path: path to genebank file 1450 :return: nested dictionary 1451 1452 """ 1453 records = {} 1454 with _get_line_iterator(file_path) as file: 1455 for line in file: 1456 if line.startswith('#') or not line.strip(): 1457 continue 1458 parts = line.strip().split('\t') 1459 chrom, start, end, *optional = parts 1460 1461 if chrom not in records: 1462 records[chrom] = {'locus': chrom, 'features': {}} 1463 feature_type = 'region' 1464 if feature_type not in records[chrom]['features']: 1465 records[chrom]['features'][feature_type] = {} 1466 1467 feature_id = len(records[chrom]['features'][feature_type]) 1468 feature = { 1469 'location': [[int(start), int(end)]], # BED uses 0-based start, convert to 1-based 1470 'strand': '+', # assume '+' if not present 1471 } 1472 1473 # Handle optional columns (name, score, strand) --> ignore 7-12 1474 if len(optional) >= 1: 1475 feature['name'] = optional[0] 1476 if len(optional) >= 2: 1477 feature['score'] = optional[1] 1478 if len(optional) >= 3: 1479 feature['strand'] = optional[2] 1480 1481 records[chrom]['features'][feature_type][feature_id] = feature 1482 1483 return records 1484 1485 parse_functions: Dict[str, Callable[[str], dict]] = { 1486 'gb': parse_gb, 1487 'bed': parse_bed, 1488 'gff': parse_gff, 1489 } 1490 # determine the annotation content -> should be standard formatted 1491 try: 1492 annotation_type = detect_annotation_type(annotation_path) 1493 except ValueError as err: 1494 raise err 1495 1496 # read in the annotation 1497 annotations = parse_functions[annotation_type](annotation_path) 1498 1499 # sanity check whether one of the annotation ids and alignment ids match 1500 annotation_found = False 1501 for annotation in annotations.keys(): 1502 for aln_id in aln.alignment.keys(): 1503 aln_id_sanitized = aln_id.split(' ')[0] 1504 # check in both directions 1505 if aln_id_sanitized in annotation: 1506 annotation_found = True 1507 break 1508 if annotation in aln_id_sanitized: 1509 annotation_found = True 1510 break 1511 1512 if not annotation_found: 1513 raise ValueError(f'the annotations of {annotation_path} do not match any ids in the MSA') 1514 1515 # return only the annotation that has been found, the respective type and the seq_id to map to 1516 return annotation_type, aln_id, annotations[annotation]['locus'], annotations[annotation]['features'] 1517 1518 1519 def _build_position_map(self) -> Dict[int, int]: 1520 """ 1521 build a position map from a sequence. 1522 1523 :return genomic position: gapped position 1524 """ 1525 1526 position_map = {} 1527 genomic_pos = 0 1528 for aln_index, char in enumerate(self._gapped_seq): 1529 if char != '-': 1530 position_map[genomic_pos] = aln_index 1531 genomic_pos += 1 1532 # ensure the last genomic position is included 1533 position_map[genomic_pos] = len(self._gapped_seq) 1534 1535 return position_map 1536 1537 1538 def _map_to_alignment(self): 1539 """ 1540 Adjust all feature locations to alignment positions 1541 """ 1542 1543 def map_location(position_map: Dict[int, int], locations: list) -> list: 1544 """ 1545 Map genomic locations to alignment positions using a precomputed position map. 1546 1547 :param position_map: Positions mapped from gapped to ungapped 1548 :param locations: List of genomic start and end positions. 1549 :return: List of adjusted alignment positions. 1550 """ 1551 1552 aligned_locs = [] 1553 for start, end in locations: 1554 try: 1555 aligned_start = position_map[start] 1556 aligned_end = position_map[end] 1557 aligned_locs.append([aligned_start, aligned_end]) 1558 except KeyError: 1559 raise ValueError(f"Positions {start}-{end} lie outside of the position map.") 1560 1561 return aligned_locs 1562 1563 for feature_type, features in self.features.items(): 1564 for feature_id, feature_data in features.items(): 1565 original_locations = feature_data['location'] 1566 aligned_locations = map_location(self._position_map, original_locations) 1567 feature_data['location'] = aligned_locations
37class MSA: 38 """ 39 An alignment class that allows computation of several stats 40 """ 41 42 def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None): 43 """ 44 Initialise an Alignment object. 45 :param alignment_string: Path to alignment file or raw alignment string 46 :param reference_id: reference id 47 :param zoom_range: start and stop positions to zoom into the alignment 48 """ 49 self._alignment = self._read_alignment(alignment_string) 50 self._reference_id = self._validate_ref(reference_id, self._alignment) 51 self._zoom = self._validate_zoom(zoom_range, self._alignment) 52 self._aln_type = self._determine_aln_type(self._alignment) 53 54 # TODO: read in different alignment types 55 # Static methods 56 @staticmethod 57 def _read_alignment(file_path: str) -> dict: 58 """ 59 Parse MSA alignment file. 60 :param file_path: path to alignment file 61 :return: dictionary with ids as keys and sequences as values 62 """ 63 64 def add_seq(aln: dict, sequence_id: str, seq_list: list): 65 """ 66 Add a complete sequence and check for non-allowed chars 67 :param aln: alignment dictionary to build 68 :param sequence_id: sequence id to add 69 :param seq_list: sequences to add 70 :return: alignment with added sequences 71 """ 72 final_seq = ''.join(seq_list).upper() 73 # Check for non-allowed characters 74 invalid_chars = set(final_seq) - set(config.POSSIBLE_CHARS) 75 if invalid_chars: 76 raise ValueError( 77 f"{sequence_id} contains invalid characters: {', '.join(invalid_chars)}. Allowed chars are: {config.POSSIBLE_CHARS}" 78 ) 79 aln[sequence_id] = final_seq 80 81 return aln 82 83 alignment, seq_lines = {}, [] 84 seq_id = None 85 86 with _get_line_iterator(file_path) as file: 87 for i, line in enumerate(file): 88 line = line.strip() 89 # initial check for fasta format 90 if i == 0 and not line.startswith(">"): 91 raise ValueError('Alignment has to be in fasta format starting with >SeqID.') 92 if line.startswith(">"): 93 if seq_id: 94 alignment = add_seq(alignment, seq_id, seq_lines) 95 # initialize a new sequence 96 seq_id, seq_lines = line[1:], [] 97 else: 98 seq_lines.append(line) 99 # handle last sequence 100 if seq_id: 101 alignment = add_seq(alignment, seq_id, seq_lines) 102 # final sanity checks 103 if alignment: 104 # alignment contains only one sequence: 105 if len(alignment) < 2: 106 raise ValueError("Alignment must contain more than one sequence.") 107 # alignment sequences are not same length 108 first_seq_len = len(next(iter(alignment.values()))) 109 for sequence_id, sequence in alignment.items(): 110 if len(sequence) != first_seq_len: 111 raise ValueError( 112 f"All alignment sequences must have the same length. Sequence '{sequence_id}' has length {len(sequence)}, expected {first_seq_len}." 113 ) 114 # all checks passed 115 return alignment 116 else: 117 raise ValueError(f"Alignment file {file_path} does not contain any sequences in fasta format.") 118 119 @staticmethod 120 def _validate_ref(reference: str | None, alignment: dict) -> str | None | ValueError: 121 """ 122 Validate if the ref seq is indeed part of the alignment. 123 :param reference: reference seq id 124 :param alignment: alignment dict 125 :return: validated reference 126 """ 127 if reference in alignment.keys(): 128 return reference 129 elif reference is None: 130 return reference 131 else: 132 raise ValueError('Reference not in alignment.') 133 134 @staticmethod 135 def _validate_zoom(zoom: tuple | int, original_aln: dict) -> ValueError | tuple | None: 136 """ 137 Validates if the user defined zoom range is within the start, end of the initial 138 alignment.\n 139 :param zoom: zoom range or zoom start 140 :param original_aln: non-zoomed alignment dict 141 :return: validated zoom range 142 """ 143 if zoom is not None: 144 aln_length = len(original_aln[list(original_aln.keys())[0]]) 145 # check if only over value is provided -> stop is alignment length 146 if isinstance(zoom, int): 147 if 0 <= zoom < aln_length: 148 return zoom, aln_length - 1 149 else: 150 raise ValueError('Zoom start must be within the alignment length range.') 151 # check if more than 2 values are provided 152 if len(zoom) != 2: 153 raise ValueError('Zoom position have to be (zoom_start, zoom_end)') 154 # validate zoom start/stop 155 for position in zoom: 156 if type(position) != int: 157 raise ValueError('Zoom positions have to be integers.') 158 if position not in range(0, aln_length): 159 raise ValueError('Zoom position out of range') 160 161 return zoom 162 163 @staticmethod 164 def _determine_aln_type(alignment) -> str: 165 """ 166 Determine the most likely type of alignment 167 if 70% of chars in the alignment are nucleotide 168 chars it is most likely a nt alignment 169 :return: type of alignment 170 """ 171 counter = int() 172 for record in alignment: 173 if 'U' in alignment[record]: 174 return 'RNA' 175 counter += sum(map(alignment[record].count, ['A', 'C', 'G', 'T', 'N', '-'])) 176 # determine which is the most likely type 177 if counter / len(alignment) >= 0.7 * len(alignment[list(alignment.keys())[0]]): 178 return 'DNA' 179 else: 180 return 'AA' 181 182 # Properties with setters 183 @property 184 def reference_id(self): 185 return self._reference_id 186 187 @reference_id.setter 188 def reference_id(self, ref_id: str): 189 """ 190 Set and validate the reference id. 191 """ 192 self._reference_id = self._validate_ref(ref_id, self.alignment) 193 194 @property 195 def zoom(self) -> tuple: 196 return self._zoom 197 198 @zoom.setter 199 def zoom(self, zoom_pos: tuple | int): 200 """ 201 Validate if the user defined zoom range. 202 """ 203 self._zoom = self._validate_zoom(zoom_pos, self._alignment) 204 205 # Property without setters 206 @property 207 def aln_type(self) -> str: 208 """ 209 define the aln type: 210 RNA, DNA or AA 211 """ 212 return self._aln_type 213 214 # On the fly properties without setters 215 @property 216 def length(self) -> int: 217 return len(next(iter(self.alignment.values()))) 218 219 @property 220 def alignment(self) -> dict: 221 """ 222 (zoomed) version of the alignment. 223 """ 224 if self.zoom is not None: 225 zoomed_aln = dict() 226 for seq in self._alignment: 227 zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]] 228 return zoomed_aln 229 else: 230 return self._alignment 231 232 # functions for different alignment stats 233 def get_reference_coords(self) -> tuple[int, int]: 234 """ 235 Determine the start and end coordinates of the reference sequence 236 defined as the first/last nucleotide in the reference sequence 237 (excluding N and gaps). 238 239 :return: start, end 240 """ 241 start, end = 0, self.length 242 243 if self.reference_id is None: 244 return start, end 245 else: 246 # 5' --> 3' 247 for start in range(self.length): 248 if self.alignment[self.reference_id][start] not in ['-', 'N']: 249 break 250 # 3' --> 5' 251 for end in range(self.length - 1, 0, -1): 252 if self.alignment[self.reference_id][end] not in ['-', 'N']: 253 break 254 255 return start, end 256 257 def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str: 258 """ 259 Creates a non-gapped consensus sequence. 260 261 :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes 262 the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments) 263 or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold. 264 :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position 265 has a frequency above the defined threshold. 266 :return: consensus sequence 267 """ 268 269 # helper functions 270 def determine_counts(alignment_dict: dict, position: int) -> dict: 271 """ 272 count the number of each char at 273 an idx of the alignment. return sorted dic. 274 handles ambiguous nucleotides in sequences. 275 also handles gaps. 276 """ 277 nucleotide_list = [] 278 279 # get all nucleotides 280 for sequence in alignment_dict.items(): 281 nucleotide_list.append(sequence[1][position]) 282 # count occurences of nucleotides 283 counter = dict(collections.Counter(nucleotide_list)) 284 # get permutations of an ambiguous nucleotide 285 to_delete = [] 286 temp_dict = {} 287 for nucleotide in counter: 288 if nucleotide in config.AMBIG_CHARS[self.aln_type]: 289 to_delete.append(nucleotide) 290 permutations = config.AMBIG_CHARS[self.aln_type][nucleotide] 291 adjusted_freq = 1 / len(permutations) 292 for permutation in permutations: 293 if permutation in temp_dict: 294 temp_dict[permutation] += adjusted_freq 295 else: 296 temp_dict[permutation] = adjusted_freq 297 298 # drop ambiguous entries and add adjusted freqs to 299 if to_delete: 300 for i in to_delete: 301 counter.pop(i) 302 for nucleotide in temp_dict: 303 if nucleotide in counter: 304 counter[nucleotide] += temp_dict[nucleotide] 305 else: 306 counter[nucleotide] = temp_dict[nucleotide] 307 308 return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True)) 309 310 def get_consensus_char(counts: dict, cutoff: float) -> list: 311 """ 312 get a list of nucleotides for the consensus seq 313 """ 314 n = 0 315 316 consensus_chars = [] 317 for char in counts: 318 n += counts[char] 319 consensus_chars.append(char) 320 if n >= cutoff: 321 break 322 323 return consensus_chars 324 325 def get_ambiguous_char(nucleotides: list) -> str: 326 """ 327 get ambiguous char from a list of nucleotides 328 """ 329 for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items(): 330 if set(permutations) == set(nucleotides): 331 break 332 333 return ambiguous 334 335 # check if params have been set correctly 336 if threshold is not None: 337 if threshold < 0 or threshold > 1: 338 raise ValueError('Threshold must be between 0 and 1.') 339 if self.aln_type == 'AA' and use_ambig_nt: 340 raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.') 341 if threshold is None and use_ambig_nt: 342 raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.') 343 344 alignment = self.alignment 345 consensus = str() 346 347 if threshold is not None: 348 consensus_cutoff = len(alignment) * threshold 349 else: 350 consensus_cutoff = 0 351 352 # built consensus sequences 353 for idx in range(self.length): 354 char_counts = determine_counts(alignment, idx) 355 consensus_chars = get_consensus_char( 356 char_counts, 357 consensus_cutoff 358 ) 359 if threshold != 0: 360 if len(consensus_chars) > 1: 361 if use_ambig_nt: 362 char = get_ambiguous_char(consensus_chars) 363 else: 364 if self.aln_type == 'AA': 365 char = 'X' 366 else: 367 char = 'N' 368 consensus = consensus + char 369 else: 370 consensus = consensus + consensus_chars[0] 371 else: 372 consensus = consensus + consensus_chars[0] 373 374 return consensus 375 376 def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict: 377 """ 378 **conserved ORF definition:** 379 - conserved starts and stops 380 - start, stop must be on the same frame 381 - stop - start must be at least min_length 382 - all ungapped seqs[start:stop] must have at least min_length 383 - no ungapped seq can have a Stop in between Start Stop 384 385 Conservation is measured by number of positions with identical characters divided by 386 orf slice of the alignment. 387 388 **Algorithm overview:** 389 - check for conserved start and stop codons 390 - iterate over all three frames 391 - check each start and next sufficiently far away stop codon 392 - check if all ungapped seqs between start and stop codon are >= min_length 393 - check if no ungapped seq in the alignment has a stop codon 394 - write to dictionary 395 - classify as internal if the stop codon has already been written with a prior start 396 - repeat for reverse complement 397 398 :return: ORF positions and internal ORF positions 399 """ 400 401 # helper functions 402 def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple: 403 """ 404 Determine all start and stop codons within an alignment. 405 :param alignment: alignment 406 :param alignment_length: length of alignment 407 :return: start and stop codon positions 408 """ 409 starts = config.START_CODONS[self.aln_type] 410 stops = config.STOP_CODONS[self.aln_type] 411 412 list_of_starts, list_of_stops = [], [] 413 ref = alignment[list(alignment.keys())[0]] 414 for nt_position in range(alignment_length): 415 if ref[nt_position:nt_position + 3] in starts: 416 conserved_start = True 417 for sequence in alignment: 418 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts: 419 conserved_start = False 420 break 421 if conserved_start: 422 list_of_starts.append(nt_position) 423 424 if ref[nt_position:nt_position + 3] in stops: 425 conserved_stop = True 426 for sequence in alignment: 427 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops: 428 conserved_stop = False 429 break 430 if conserved_stop: 431 list_of_stops.append(nt_position) 432 433 return list_of_starts, list_of_stops 434 435 def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list: 436 """ 437 get ungapped sequences starting and stop codons and eliminate gaps 438 :param alignment: alignment 439 :param start_pos: start codon 440 :param stop_pos: stop codon 441 :return: sliced sequences 442 """ 443 ungapped_seqs = [] 444 for seq_id in alignment: 445 ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', '')) 446 447 return ungapped_seqs 448 449 def additional_stops(ungapped_seqs: list) -> bool: 450 """ 451 Checks for the presence of a stop codon 452 :param ungapped_seqs: list of ungapped sequences 453 :return: Additional stop codons (True/False) 454 """ 455 stops = config.STOP_CODONS[self.aln_type] 456 457 for sliced_seq in ungapped_seqs: 458 for position in range(0, len(sliced_seq) - 3, 3): 459 if sliced_seq[position:position + 3] in stops: 460 return True 461 return False 462 463 def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float: 464 sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1 # identical = 0, different = -1 --> add 1 465 return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100 466 467 # checks for arguments 468 if self.aln_type == 'AA': 469 raise TypeError('ORF search only for RNA/DNA alignments') 470 471 if identity_cutoff is not None: 472 if identity_cutoff > 100 or identity_cutoff < 0: 473 raise ValueError('conservation cutoff must be between 0 and 100') 474 475 if min_length <= 6 or min_length > self.length: 476 raise ValueError(f'min_length must be between 6 and {self.length}') 477 478 # ini 479 identities = self.calc_identity_alignment() 480 alignments = [self.alignment, self.calc_reverse_complement_alignment()] 481 aln_len = self.length 482 483 orf_counter = 0 484 orf_dict = {} 485 486 for aln, direction in zip(alignments, ['+', '-']): 487 # check for starts and stops in the first seq and then check if these are present in all seqs 488 conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len) 489 # check each frame 490 for frame in (0, 1, 2): 491 potential_starts = [x for x in conserved_starts if x % 3 == frame] 492 potential_stops = [x for x in conserved_stops if x % 3 == frame] 493 last_stop = -1 494 for start in potential_starts: 495 # go to the next stop that is sufficiently far away in the alignment 496 next_stops = [x for x in potential_stops if x + 3 >= start + min_length] 497 if not next_stops: 498 continue 499 next_stop = next_stops[0] 500 ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop) 501 # re-check the lengths of all ungapped seqs 502 ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs] 503 if not all(ungapped_seq_lengths): 504 continue 505 # if no stop codon between start and stop --> write to dictionary 506 if not additional_stops(ungapped_sliced_seqs): 507 if direction == '+': 508 positions = [start, next_stop + 3] 509 else: 510 positions = [aln_len - next_stop - 3, aln_len - start] 511 if last_stop != next_stop: 512 last_stop = next_stop 513 conservation = calculate_identity(identities, positions) 514 if identity_cutoff is not None and conservation < identity_cutoff: 515 continue 516 orf_dict[f'ORF_{orf_counter}'] = {'location': [positions], 517 'frame': frame, 518 'strand': direction, 519 'conservation': conservation, 520 'internal': [] 521 } 522 orf_counter += 1 523 else: 524 if orf_dict: 525 orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions) 526 527 return orf_dict 528 529 def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict: 530 """ 531 First calculates all ORFs and then searches from 5' 532 all non-overlapping orfs in the fw strand and from the 533 3' all non-overlapping orfs in th rw strand. 534 535 **No overlap algorithm:** 536 **frame 1:** -[M------*]--- ----[M--*]---------[M----- 537 538 **frame 2:** -------[M------*]---------[M---*]-------- 539 540 **frame 3:** [M---*]-----[M----------*]----------[M--- 541 542 **results:** [M---*][M------*]--[M--*]-[M---*]-[M----- 543 544 frame: 3 2 1 2 1 545 546 :return: dictionary with non-overlapping orfs 547 """ 548 orf_dict = self.get_conserved_orfs(min_length, identity_cutoff) 549 550 fw_orfs, rw_orfs = [], [] 551 552 for orf in orf_dict: 553 if orf_dict[orf]['strand'] == '+': 554 fw_orfs.append((orf, orf_dict[orf]['location'][0])) 555 else: 556 rw_orfs.append((orf, orf_dict[orf]['location'][0])) 557 558 fw_orfs.sort(key=lambda x: x[1][0]) # sort by start pos 559 rw_orfs.sort(key=lambda x: x[1][1], reverse=True) # sort by stop pos 560 non_overlapping_orfs = [] 561 for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']): 562 previous_stop = -1 if strand == '+' else self.length + 1 563 for orf in orf_list: 564 if strand == '+' and orf[1][0] > previous_stop: 565 non_overlapping_orfs.append(orf[0]) 566 previous_stop = orf[1][1] 567 elif strand == '-' and orf[1][1] < previous_stop: 568 non_overlapping_orfs.append(orf[0]) 569 previous_stop = orf[1][0] 570 571 non_overlap_dict = {} 572 for orf in orf_dict: 573 if orf in non_overlapping_orfs: 574 non_overlap_dict[orf] = orf_dict[orf] 575 576 return non_overlap_dict 577 578 def calc_length_stats(self) -> dict: 579 """ 580 Determine the stats for the length of the ungapped seqs in the alignment. 581 :return: dictionary with length stats 582 """ 583 584 seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment] 585 586 return {'number of seq': len(self.alignment), 587 'mean length': float(np.mean(seq_lengths)), 588 'std length': float(np.std(seq_lengths)), 589 'min length': int(np.min(seq_lengths)), 590 'max length': int(np.max(seq_lengths)) 591 } 592 593 def calc_entropy(self) -> list: 594 """ 595 Calculate the normalized shannon's entropy for every position in an alignment: 596 597 - 1: high entropy 598 - 0: low entropy 599 600 :return: Entropies at each position. 601 """ 602 603 # helper functions 604 def shannons_entropy(character_list: list, states: int, aln_type: str) -> float: 605 """ 606 Calculate the shannon's entropy of a sequence and 607 normalized between 0 and 1. 608 :param character_list: characters at an alignment position 609 :param states: number of potential characters that can be present 610 :param aln_type: type of the alignment 611 :returns: entropy 612 """ 613 ent, n_chars = 0, len(character_list) 614 # only one char is in the list 615 if n_chars <= 1: 616 return ent 617 # calculate the number of unique chars and their counts 618 chars, char_counts = np.unique(character_list, return_counts=True) 619 char_counts = char_counts.astype(float) 620 # ignore gaps for entropy calc 621 char_counts, chars = char_counts[chars != "-"], chars[chars != "-"] 622 # correctly handle ambiguous chars 623 index_to_drop = [] 624 for index, char in enumerate(chars): 625 if char in config.AMBIG_CHARS[aln_type]: 626 index_to_drop.append(index) 627 amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True) 628 amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char]) 629 # add the proportionate numbers to initial array 630 for amb_char, amb_count in zip(amb_chars, amb_counts): 631 if amb_char in chars: 632 char_counts[chars == amb_char] += amb_count 633 else: 634 chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count) 635 # drop the ambiguous characters from array 636 char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop) 637 # calc the entropy 638 probs = char_counts / n_chars 639 if np.count_nonzero(probs) <= 1: 640 return ent 641 for prob in probs: 642 ent -= prob * math.log(prob, states) 643 644 return ent 645 646 aln = self.alignment 647 entropys = [] 648 649 if self.aln_type == 'AA': 650 states = 20 651 else: 652 states = 4 653 # iterate over alignment positions and the sequences 654 for nuc_pos in range(self.length): 655 pos = [] 656 for record in aln: 657 pos.append(aln[record][nuc_pos]) 658 entropys.append(shannons_entropy(pos, states, self.aln_type)) 659 660 return entropys 661 662 def calc_gc(self) -> list | TypeError: 663 """ 664 Determine the GC content for every position in an nt alignment. 665 :return: GC content for every position. 666 :raises: TypeError for AA alignments 667 """ 668 if self.aln_type == 'AA': 669 raise TypeError("GC computation is not possible for aminoacid alignment") 670 671 gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type] 672 673 for position in range(self.length): 674 nucleotides = str() 675 for record in aln: 676 nucleotides = nucleotides + aln[record][position] 677 # ini dict with chars that occur and which ones to 678 # count in which freq 679 to_count = { 680 'G': 1, 681 'C': 1, 682 } 683 # handle ambig. nuc 684 for char in amb_nucs: 685 if char in nucleotides: 686 to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char]) 687 688 gc.append( 689 sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides) 690 ) 691 692 return gc 693 694 def calc_coverage(self) -> list: 695 """ 696 Determine the coverage of every position in an alignment. 697 This is defined as: 698 1 - cumulative length of '-' characters 699 700 :return: Coverage at each alignment position. 701 """ 702 coverage, aln = [], self.alignment 703 704 for nuc_pos in range(self.length): 705 pos = str() 706 for record in aln.keys(): 707 pos = pos + aln[record][nuc_pos] 708 coverage.append(1 - pos.count('-') / len(pos)) 709 710 return coverage 711 712 def calc_reverse_complement_alignment(self) -> dict | TypeError: 713 """ 714 Reverse complement the alignment. 715 :return: Alignment (rv) 716 """ 717 if self.aln_type == 'AA': 718 raise TypeError('Reverse complement only for RNA or DNA.') 719 720 aln = self.alignment 721 reverse_complement_dict = {} 722 723 for seq_id in aln: 724 reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id])) 725 726 return reverse_complement_dict 727 728 def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False): 729 """ 730 Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the 731 remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard']. 732 733 :param encode_ambiguities: encode ambiguities as -2 734 :param encode_mask: encode mask with as -3 735 :returns matrix 736 """ 737 738 aln = self.alignment 739 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 740 # ini matrix 741 numerical_matrix = np.full(sequences.shape, np.nan, dtype=float) 742 # first encode mask 743 if encode_mask: 744 if self.aln_type == 'AA': 745 is_n_or_x = np.isin(sequences, ['X']) 746 else: 747 is_n_or_x = np.isin(sequences, ['N']) 748 numerical_matrix[is_n_or_x] = -2 749 # next encode ambig chars 750 if encode_ambiguities: 751 numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3 752 # next convert each char into their respective values 753 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 754 numerical_matrix[np.isin(sequences, [char])] = idx + 1 755 756 return numerical_matrix 757 758 def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray: 759 """ 760 Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n 761 762 :param encode_mismatches: encode mismatch as -1 763 :param encode_mask: encode mask with value=-2 --> also in the reference 764 :param encode_gaps: encode gaps with np.nan --> also in the reference 765 :param encode_ambiguities: encode ambiguities with value=-3 766 :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard'] 767 :return: identity alignment 768 """ 769 770 aln = self.alignment 771 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 772 773 # convert alignment to array 774 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 775 reference = np.array(list(ref)) 776 # ini matrix 777 identity_matrix = np.full(sequences.shape, 0, dtype=float) 778 779 is_identical = sequences == reference 780 781 if encode_gaps: 782 is_gap = sequences == '-' 783 else: 784 is_gap = np.full(sequences.shape, False) 785 786 if encode_mask: 787 if self.aln_type == 'AA': 788 is_n_or_x = np.isin(sequences, ['X']) 789 else: 790 is_n_or_x = np.isin(sequences, ['N']) 791 else: 792 is_n_or_x = np.full(sequences.shape, False) 793 794 if encode_ambiguities: 795 is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']]) 796 else: 797 is_ambig = np.full(sequences.shape, False) 798 799 if encode_mismatches: 800 is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig 801 else: 802 is_mismatch = np.full(sequences.shape, False) 803 804 # encode every different character 805 if encode_each_mismatch_char: 806 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 807 new_encoding = np.isin(sequences, [char]) & is_mismatch 808 identity_matrix[new_encoding] = idx + 1 809 # or encode different with a single value 810 else: 811 identity_matrix[is_mismatch] = -1 # mismatch 812 813 identity_matrix[is_gap] = np.nan # gap 814 identity_matrix[is_n_or_x] = -2 # 'N' or 'X' 815 identity_matrix[is_ambig] = -3 # ambiguities 816 817 return identity_matrix 818 819 def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray: 820 """ 821 Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight 822 differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the 823 reference residue at each column. Gaps are encoded as np.nan. 824 825 The calculation follows these steps: 826 827 1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise, 828 a consensus sequence is generated to serve as the reference. 829 2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as 830 BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type. 831 3. **Per-Column Normalization (optional)**: 832 833 For each column in the alignment: 834 - The residue in the reference sequence is treated as the baseline for that column. 835 - The substitution scores for the reference residue are extracted from the substitution matrix. 836 - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue. 837 - This ensures that identical residues (or those with high similarity to the reference) have high scores, 838 while more dissimilar residues have lower scores. 839 4. **Output**: 840 841 - The normalized similarity scores are stored in a NumPy array. 842 - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`. 843 844 :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN) 845 :param: normalize: whether to normalize the similarity scores to range [0, 1] 846 :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue 847 and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity). 848 Gaps and invalid residues are encoded as `np.nan`. 849 :raise: ValueError 850 If the specified substitution matrix is not available for the given alignment type. 851 """ 852 853 aln = self.alignment 854 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 855 if matrix_type is None: 856 if self.aln_type == 'AA': 857 matrix_type = 'BLOSUM65' 858 else: 859 matrix_type = 'TRANS' 860 # load substitution matrix as dictionary 861 try: 862 subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type] 863 except KeyError: 864 raise ValueError( 865 f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}' 866 ) 867 868 # set dtype and convert alignment to a NumPy array for vectorized processing 869 dtype = np.dtype(float, metadata={'matrix': matrix_type}) 870 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 871 reference = np.array(list(ref)) 872 valid_chars = list(subs_matrix.keys()) 873 similarity_array = np.full(sequences.shape, np.nan, dtype=dtype) 874 875 for j, ref_char in enumerate(reference): 876 if ref_char not in valid_chars + ['-']: 877 continue 878 # Get local min and max for the reference residue 879 if normalize and ref_char != '-': 880 local_scores = subs_matrix[ref_char].values() 881 local_min, local_max = min(local_scores), max(local_scores) 882 883 for i, char in enumerate(sequences[:, j]): 884 if char not in valid_chars: 885 continue 886 # classify the similarity as max if the reference has a gap 887 similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1 888 similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score 889 890 return similarity_array 891 892 def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError: 893 """ 894 Calculates a position matrix of the specified type for the given alignment. The function 895 supports generating matrices of types Position Frequency Matrix (PFM), Position Probability 896 Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates 897 the provided matrix type and includes pseudo-count adjustments to ensure robust calculations. 898 899 :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM', 900 'PWM', and 'IC'. Defaults to 'PWM'. 901 :type matrix_type: str 902 :raises ValueError: If the provided `matrix_type` is not one of the accepted values. 903 :return: A numpy array representing the calculated position matrix of the specified type. 904 :rtype: np.ndarray 905 """ 906 907 # ini 908 aln = self.alignment 909 if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']: 910 raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.') 911 possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys()) 912 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 913 914 # calc position frequency matrix 915 pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars]) 916 if matrix_type == 'PFM': 917 return pfm 918 919 # calc position probability matrix (probability) 920 pseudo_count = 0.0001 # to avoid 0 values 921 pfm = pfm + pseudo_count 922 ppm_non_char_excluded = pfm/np.sum(pfm, axis=0) # use this for pwm/ic calculation 923 ppm = pfm/len(aln.keys()) # calculate the frequency based on row number 924 if matrix_type == 'PPM': 925 return ppm 926 927 # calc position weight matrix (log-likelihood) 928 pwm = np.log2(ppm_non_char_excluded * len(possible_chars)) 929 if matrix_type == 'PWM': 930 return pwm 931 932 # calc information content per position (in bits) - can be used to scale a ppm for sequence logos 933 ic = np.sum(ppm_non_char_excluded * pwm, axis=0) 934 if matrix_type == 'IC': 935 return ic 936 937 return None 938 939 def calc_percent_recovery(self) -> dict: 940 """ 941 Recovery per sequence either compared to the majority consensus seq 942 or the reference seq.\n 943 Defined as:\n 944 945 `(1 - sum(N/X/- characters in ungapped ref regions))*100` 946 947 This is highly similar to how nextclade calculates recovery over reference. 948 949 :return: dict 950 """ 951 952 aln = self.alignment 953 954 if self.reference_id is not None: 955 ref = aln[self.reference_id] 956 else: 957 ref = self.get_consensus() # majority consensus 958 959 if not any(char != '-' for char in ref): 960 raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.") 961 962 963 # count 'N', 'X' and '-' chars in non-gapped regions 964 recovery_over_ref = dict() 965 966 # Get positions of non-gap characters in the reference 967 non_gap_positions = [i for i, char in enumerate(ref) if char != '-'] 968 cumulative_length = len(non_gap_positions) 969 970 # Calculate recovery 971 for seq_id in aln: 972 if seq_id == self.reference_id: 973 continue 974 seq = aln[seq_id] 975 count_invalid = sum( 976 seq[pos] == '-' or 977 (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N') 978 for pos in non_gap_positions 979 ) 980 recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100 981 982 return recovery_over_ref 983 984 def calc_character_frequencies(self) -> dict: 985 """ 986 Calculate the percentage characters in the alignment: 987 The frequencies are counted by seq and in total. The 988 percentage of non-gap characters in the alignment is 989 relative to the total number of non-gap characters. 990 The gap percentage is relative to the sequence length. 991 992 The output is a nested dictionary. 993 994 :return: Character frequencies 995 """ 996 997 aln, aln_length = self.alignment, self.length 998 999 freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}} 1000 1001 for seq_id in aln: 1002 freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0 1003 unique_chars = set(aln[seq_id]) 1004 for char in unique_chars: 1005 if char == '-': 1006 continue 1007 # add characters to dictionaries 1008 if char not in freqs[seq_id]: 1009 freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0} 1010 if char not in freqs['total']: 1011 freqs['total'][char] = {'counts': 0, '% of non-gapped': 0} 1012 # count non-gap chars 1013 freqs[seq_id][char]['counts'] += aln[seq_id].count(char) 1014 freqs['total'][char]['counts'] += freqs[seq_id][char]['counts'] 1015 all_chars += freqs[seq_id][char]['counts'] 1016 # normalize counts 1017 for char in freqs[seq_id]: 1018 if char == '-': 1019 continue 1020 freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100 1021 freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped'] 1022 # count gaps 1023 freqs[seq_id]['-']['counts'] = aln[seq_id].count('-') 1024 freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts'] 1025 # normalize gap counts 1026 freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100 1027 freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment'] 1028 1029 # normalize the total counts 1030 for char in freqs['total']: 1031 for value in freqs['total'][char]: 1032 if value == '% of alignment' or value == '% of non-gapped': 1033 freqs['total'][char][value] = freqs['total'][char][value] / len(aln) 1034 1035 return freqs 1036 1037 def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray: 1038 """ 1039 Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented: 1040 1041 **1) ghd (global hamming distance)**: At each alignment position, check if characters match: 1042 \ndistance = matches / alignment_length * 100 1043 1044 **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps: 1045 \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100 1046 1047 **3) ged (gap excluded distance)**: All gaps are excluded from the alignment 1048 \ndistance = matches / (matches + mismatches) * 100 1049 1050 **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch. 1051 \ndistance = matches / gap_compressed_alignment_length * 100 1052 1053 :return: array with pairwise distances. 1054 """ 1055 1056 def hamming_distance(seq1: str, seq2: str) -> int: 1057 return sum(c1 == c2 for c1, c2 in zip(seq1, seq2)) 1058 1059 def ghd(seq1: str, seq2: str) -> float: 1060 return hamming_distance(seq1, seq2) / self.length * 100 1061 1062 def lhd(seq1, seq2): 1063 # Trim gaps from both sides 1064 i, j = 0, self.length - 1 1065 while i < self.length and (seq1[i] == '-' or seq2[i] == '-'): 1066 i += 1 1067 while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'): 1068 j -= 1 1069 if i > j: 1070 return 0.0 1071 1072 seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1] 1073 matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_)) 1074 length = j - i + 1 1075 return (matches / length) * 100 if length > 0 else 0.0 1076 1077 def ged(seq1: str, seq2: str) -> float: 1078 1079 matches, mismatches = 0, 0 1080 1081 for c1, c2 in zip(seq1, seq2): 1082 if c1 != '-' and c2 != '-': 1083 if c1 == c2: 1084 matches += 1 1085 else: 1086 mismatches += 1 1087 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1088 1089 def gcd(seq1: str, seq2: str) -> float: 1090 matches = 0 1091 mismatches = 0 1092 in_gap = False 1093 1094 for char1, char2 in zip(seq1, seq2): 1095 if char1 == '-' and char2 == '-': # Shared gap: do nothing 1096 continue 1097 elif char1 == '-' or char2 == '-': # Gap in only one sequence 1098 if not in_gap: # Start of a new gap stretch 1099 mismatches += 1 1100 in_gap = True 1101 else: # No gaps 1102 in_gap = False 1103 if char1 == char2: # Matching characters 1104 matches += 1 1105 else: # Mismatched characters 1106 mismatches += 1 1107 1108 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1109 1110 1111 # Map distance type to corresponding function 1112 distance_functions: Dict[str, Callable[[str, str], float]] = { 1113 'ghd': ghd, 1114 'lhd': lhd, 1115 'ged': ged, 1116 'gcd': gcd 1117 } 1118 1119 if distance_type not in distance_functions: 1120 raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.") 1121 1122 # Compute pairwise distances 1123 aln = self.alignment 1124 distance_func = distance_functions[distance_type] 1125 distance_matrix = np.zeros((len(aln), len(aln))) 1126 1127 sequences = list(aln.values()) 1128 n = len(sequences) 1129 for i in range(n): 1130 seq1 = sequences[i] 1131 for j in range(i, n): 1132 seq2 = sequences[j] 1133 dist = distance_func(seq1, seq2) 1134 distance_matrix[i, j] = dist 1135 distance_matrix[j, i] = dist 1136 1137 return distance_matrix 1138 1139 def get_snps(self, include_ambig:bool=False) -> dict: 1140 """ 1141 Calculate snps similar to snp-sites (output is comparable): 1142 https://github.com/sanger-pathogens/snp-sites 1143 Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character. 1144 The SNPs are compared to a majority consensus sequence or to a reference if it has been set. 1145 1146 :param include_ambig: Include ambiguous snps (default: False) 1147 :return: dictionary containing snp positions and their variants including their frequency. 1148 """ 1149 aln = self.alignment 1150 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 1151 aln = {x: aln[x] for x in aln.keys() if x != self.reference_id} 1152 seq_ids = list(aln.keys()) 1153 snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}} 1154 1155 for pos in range(self.length): 1156 reference_char = ref[pos] 1157 if not include_ambig: 1158 if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-': 1159 continue 1160 alt_chars, snps = [], [] 1161 for i, seq_id in enumerate(aln.keys()): 1162 alt_chars.append(aln[seq_id][pos]) 1163 if reference_char != aln[seq_id][pos]: 1164 snps.append(i) 1165 if not snps: 1166 continue 1167 if include_ambig: 1168 if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps): 1169 continue 1170 else: 1171 snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]] 1172 if not snps: 1173 continue 1174 if pos not in snp_dict: 1175 snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}} 1176 for snp in snps: 1177 if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']: 1178 snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = { 1179 'AF': 1, 1180 'SEQ_ID': [seq_ids[snp]] 1181 } 1182 else: 1183 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1 1184 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp]) 1185 # calculate AF 1186 if pos in snp_dict['POS']: 1187 for alt in snp_dict['POS'][pos]['ALT']: 1188 snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln) 1189 1190 return snp_dict 1191 1192 def calc_transition_transversion_score(self) -> list: 1193 """ 1194 Based on the snp positions, calculates a transition/transversions score. 1195 A positive score means higher ratio of transitions and negative score means 1196 a higher ratio of transversions. 1197 :return: list 1198 """ 1199 1200 if self.aln_type == 'AA': 1201 raise TypeError('TS/TV scoring only for RNA/DNA alignments') 1202 1203 # ini 1204 snps = self.get_snps() 1205 score = [0]*self.length 1206 1207 for pos in snps['POS']: 1208 t_score_temp = 0 1209 for alt in snps['POS'][pos]['ALT']: 1210 # check the type of substitution 1211 if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']: 1212 score[pos] += snps['POS'][pos]['ALT'][alt]['AF'] 1213 else: 1214 score[pos] -= snps['POS'][pos]['ALT'][alt]['AF'] 1215 1216 return score
An alignment class that allows computation of several stats
42 def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None): 43 """ 44 Initialise an Alignment object. 45 :param alignment_string: Path to alignment file or raw alignment string 46 :param reference_id: reference id 47 :param zoom_range: start and stop positions to zoom into the alignment 48 """ 49 self._alignment = self._read_alignment(alignment_string) 50 self._reference_id = self._validate_ref(reference_id, self._alignment) 51 self._zoom = self._validate_zoom(zoom_range, self._alignment) 52 self._aln_type = self._determine_aln_type(self._alignment)
Initialise an Alignment object.
Parameters
- alignment_string: Path to alignment file or raw alignment string
- reference_id: reference id
- zoom_range: start and stop positions to zoom into the alignment
206 @property 207 def aln_type(self) -> str: 208 """ 209 define the aln type: 210 RNA, DNA or AA 211 """ 212 return self._aln_type
define the aln type: RNA, DNA or AA
219 @property 220 def alignment(self) -> dict: 221 """ 222 (zoomed) version of the alignment. 223 """ 224 if self.zoom is not None: 225 zoomed_aln = dict() 226 for seq in self._alignment: 227 zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]] 228 return zoomed_aln 229 else: 230 return self._alignment
(zoomed) version of the alignment.
233 def get_reference_coords(self) -> tuple[int, int]: 234 """ 235 Determine the start and end coordinates of the reference sequence 236 defined as the first/last nucleotide in the reference sequence 237 (excluding N and gaps). 238 239 :return: start, end 240 """ 241 start, end = 0, self.length 242 243 if self.reference_id is None: 244 return start, end 245 else: 246 # 5' --> 3' 247 for start in range(self.length): 248 if self.alignment[self.reference_id][start] not in ['-', 'N']: 249 break 250 # 3' --> 5' 251 for end in range(self.length - 1, 0, -1): 252 if self.alignment[self.reference_id][end] not in ['-', 'N']: 253 break 254 255 return start, end
Determine the start and end coordinates of the reference sequence defined as the first/last nucleotide in the reference sequence (excluding N and gaps).
Returns
start, end
257 def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str: 258 """ 259 Creates a non-gapped consensus sequence. 260 261 :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes 262 the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments) 263 or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold. 264 :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position 265 has a frequency above the defined threshold. 266 :return: consensus sequence 267 """ 268 269 # helper functions 270 def determine_counts(alignment_dict: dict, position: int) -> dict: 271 """ 272 count the number of each char at 273 an idx of the alignment. return sorted dic. 274 handles ambiguous nucleotides in sequences. 275 also handles gaps. 276 """ 277 nucleotide_list = [] 278 279 # get all nucleotides 280 for sequence in alignment_dict.items(): 281 nucleotide_list.append(sequence[1][position]) 282 # count occurences of nucleotides 283 counter = dict(collections.Counter(nucleotide_list)) 284 # get permutations of an ambiguous nucleotide 285 to_delete = [] 286 temp_dict = {} 287 for nucleotide in counter: 288 if nucleotide in config.AMBIG_CHARS[self.aln_type]: 289 to_delete.append(nucleotide) 290 permutations = config.AMBIG_CHARS[self.aln_type][nucleotide] 291 adjusted_freq = 1 / len(permutations) 292 for permutation in permutations: 293 if permutation in temp_dict: 294 temp_dict[permutation] += adjusted_freq 295 else: 296 temp_dict[permutation] = adjusted_freq 297 298 # drop ambiguous entries and add adjusted freqs to 299 if to_delete: 300 for i in to_delete: 301 counter.pop(i) 302 for nucleotide in temp_dict: 303 if nucleotide in counter: 304 counter[nucleotide] += temp_dict[nucleotide] 305 else: 306 counter[nucleotide] = temp_dict[nucleotide] 307 308 return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True)) 309 310 def get_consensus_char(counts: dict, cutoff: float) -> list: 311 """ 312 get a list of nucleotides for the consensus seq 313 """ 314 n = 0 315 316 consensus_chars = [] 317 for char in counts: 318 n += counts[char] 319 consensus_chars.append(char) 320 if n >= cutoff: 321 break 322 323 return consensus_chars 324 325 def get_ambiguous_char(nucleotides: list) -> str: 326 """ 327 get ambiguous char from a list of nucleotides 328 """ 329 for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items(): 330 if set(permutations) == set(nucleotides): 331 break 332 333 return ambiguous 334 335 # check if params have been set correctly 336 if threshold is not None: 337 if threshold < 0 or threshold > 1: 338 raise ValueError('Threshold must be between 0 and 1.') 339 if self.aln_type == 'AA' and use_ambig_nt: 340 raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.') 341 if threshold is None and use_ambig_nt: 342 raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.') 343 344 alignment = self.alignment 345 consensus = str() 346 347 if threshold is not None: 348 consensus_cutoff = len(alignment) * threshold 349 else: 350 consensus_cutoff = 0 351 352 # built consensus sequences 353 for idx in range(self.length): 354 char_counts = determine_counts(alignment, idx) 355 consensus_chars = get_consensus_char( 356 char_counts, 357 consensus_cutoff 358 ) 359 if threshold != 0: 360 if len(consensus_chars) > 1: 361 if use_ambig_nt: 362 char = get_ambiguous_char(consensus_chars) 363 else: 364 if self.aln_type == 'AA': 365 char = 'X' 366 else: 367 char = 'N' 368 consensus = consensus + char 369 else: 370 consensus = consensus + consensus_chars[0] 371 else: 372 consensus = consensus + consensus_chars[0] 373 374 return consensus
Creates a non-gapped consensus sequence.
Parameters
- threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments) or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold.
- use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position has a frequency above the defined threshold.
Returns
consensus sequence
376 def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict: 377 """ 378 **conserved ORF definition:** 379 - conserved starts and stops 380 - start, stop must be on the same frame 381 - stop - start must be at least min_length 382 - all ungapped seqs[start:stop] must have at least min_length 383 - no ungapped seq can have a Stop in between Start Stop 384 385 Conservation is measured by number of positions with identical characters divided by 386 orf slice of the alignment. 387 388 **Algorithm overview:** 389 - check for conserved start and stop codons 390 - iterate over all three frames 391 - check each start and next sufficiently far away stop codon 392 - check if all ungapped seqs between start and stop codon are >= min_length 393 - check if no ungapped seq in the alignment has a stop codon 394 - write to dictionary 395 - classify as internal if the stop codon has already been written with a prior start 396 - repeat for reverse complement 397 398 :return: ORF positions and internal ORF positions 399 """ 400 401 # helper functions 402 def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple: 403 """ 404 Determine all start and stop codons within an alignment. 405 :param alignment: alignment 406 :param alignment_length: length of alignment 407 :return: start and stop codon positions 408 """ 409 starts = config.START_CODONS[self.aln_type] 410 stops = config.STOP_CODONS[self.aln_type] 411 412 list_of_starts, list_of_stops = [], [] 413 ref = alignment[list(alignment.keys())[0]] 414 for nt_position in range(alignment_length): 415 if ref[nt_position:nt_position + 3] in starts: 416 conserved_start = True 417 for sequence in alignment: 418 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts: 419 conserved_start = False 420 break 421 if conserved_start: 422 list_of_starts.append(nt_position) 423 424 if ref[nt_position:nt_position + 3] in stops: 425 conserved_stop = True 426 for sequence in alignment: 427 if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops: 428 conserved_stop = False 429 break 430 if conserved_stop: 431 list_of_stops.append(nt_position) 432 433 return list_of_starts, list_of_stops 434 435 def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list: 436 """ 437 get ungapped sequences starting and stop codons and eliminate gaps 438 :param alignment: alignment 439 :param start_pos: start codon 440 :param stop_pos: stop codon 441 :return: sliced sequences 442 """ 443 ungapped_seqs = [] 444 for seq_id in alignment: 445 ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', '')) 446 447 return ungapped_seqs 448 449 def additional_stops(ungapped_seqs: list) -> bool: 450 """ 451 Checks for the presence of a stop codon 452 :param ungapped_seqs: list of ungapped sequences 453 :return: Additional stop codons (True/False) 454 """ 455 stops = config.STOP_CODONS[self.aln_type] 456 457 for sliced_seq in ungapped_seqs: 458 for position in range(0, len(sliced_seq) - 3, 3): 459 if sliced_seq[position:position + 3] in stops: 460 return True 461 return False 462 463 def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float: 464 sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1 # identical = 0, different = -1 --> add 1 465 return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100 466 467 # checks for arguments 468 if self.aln_type == 'AA': 469 raise TypeError('ORF search only for RNA/DNA alignments') 470 471 if identity_cutoff is not None: 472 if identity_cutoff > 100 or identity_cutoff < 0: 473 raise ValueError('conservation cutoff must be between 0 and 100') 474 475 if min_length <= 6 or min_length > self.length: 476 raise ValueError(f'min_length must be between 6 and {self.length}') 477 478 # ini 479 identities = self.calc_identity_alignment() 480 alignments = [self.alignment, self.calc_reverse_complement_alignment()] 481 aln_len = self.length 482 483 orf_counter = 0 484 orf_dict = {} 485 486 for aln, direction in zip(alignments, ['+', '-']): 487 # check for starts and stops in the first seq and then check if these are present in all seqs 488 conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len) 489 # check each frame 490 for frame in (0, 1, 2): 491 potential_starts = [x for x in conserved_starts if x % 3 == frame] 492 potential_stops = [x for x in conserved_stops if x % 3 == frame] 493 last_stop = -1 494 for start in potential_starts: 495 # go to the next stop that is sufficiently far away in the alignment 496 next_stops = [x for x in potential_stops if x + 3 >= start + min_length] 497 if not next_stops: 498 continue 499 next_stop = next_stops[0] 500 ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop) 501 # re-check the lengths of all ungapped seqs 502 ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs] 503 if not all(ungapped_seq_lengths): 504 continue 505 # if no stop codon between start and stop --> write to dictionary 506 if not additional_stops(ungapped_sliced_seqs): 507 if direction == '+': 508 positions = [start, next_stop + 3] 509 else: 510 positions = [aln_len - next_stop - 3, aln_len - start] 511 if last_stop != next_stop: 512 last_stop = next_stop 513 conservation = calculate_identity(identities, positions) 514 if identity_cutoff is not None and conservation < identity_cutoff: 515 continue 516 orf_dict[f'ORF_{orf_counter}'] = {'location': [positions], 517 'frame': frame, 518 'strand': direction, 519 'conservation': conservation, 520 'internal': [] 521 } 522 orf_counter += 1 523 else: 524 if orf_dict: 525 orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions) 526 527 return orf_dict
conserved ORF definition: - conserved starts and stops - start, stop must be on the same frame - stop - start must be at least min_length - all ungapped seqs[start:stop] must have at least min_length - no ungapped seq can have a Stop in between Start Stop
Conservation is measured by number of positions with identical characters divided by orf slice of the alignment.
Algorithm overview: - check for conserved start and stop codons - iterate over all three frames - check each start and next sufficiently far away stop codon - check if all ungapped seqs between start and stop codon are >= min_length - check if no ungapped seq in the alignment has a stop codon - write to dictionary - classify as internal if the stop codon has already been written with a prior start - repeat for reverse complement
Returns
ORF positions and internal ORF positions
529 def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict: 530 """ 531 First calculates all ORFs and then searches from 5' 532 all non-overlapping orfs in the fw strand and from the 533 3' all non-overlapping orfs in th rw strand. 534 535 **No overlap algorithm:** 536 **frame 1:** -[M------*]--- ----[M--*]---------[M----- 537 538 **frame 2:** -------[M------*]---------[M---*]-------- 539 540 **frame 3:** [M---*]-----[M----------*]----------[M--- 541 542 **results:** [M---*][M------*]--[M--*]-[M---*]-[M----- 543 544 frame: 3 2 1 2 1 545 546 :return: dictionary with non-overlapping orfs 547 """ 548 orf_dict = self.get_conserved_orfs(min_length, identity_cutoff) 549 550 fw_orfs, rw_orfs = [], [] 551 552 for orf in orf_dict: 553 if orf_dict[orf]['strand'] == '+': 554 fw_orfs.append((orf, orf_dict[orf]['location'][0])) 555 else: 556 rw_orfs.append((orf, orf_dict[orf]['location'][0])) 557 558 fw_orfs.sort(key=lambda x: x[1][0]) # sort by start pos 559 rw_orfs.sort(key=lambda x: x[1][1], reverse=True) # sort by stop pos 560 non_overlapping_orfs = [] 561 for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']): 562 previous_stop = -1 if strand == '+' else self.length + 1 563 for orf in orf_list: 564 if strand == '+' and orf[1][0] > previous_stop: 565 non_overlapping_orfs.append(orf[0]) 566 previous_stop = orf[1][1] 567 elif strand == '-' and orf[1][1] < previous_stop: 568 non_overlapping_orfs.append(orf[0]) 569 previous_stop = orf[1][0] 570 571 non_overlap_dict = {} 572 for orf in orf_dict: 573 if orf in non_overlapping_orfs: 574 non_overlap_dict[orf] = orf_dict[orf] 575 576 return non_overlap_dict
First calculates all ORFs and then searches from 5' all non-overlapping orfs in the fw strand and from the 3' all non-overlapping orfs in th rw strand.
No overlap algorithm: frame 1: -[M------]--- ----[M--]---------[M-----
**frame 2:** -------[M------*]---------[M---*]--------
**frame 3:** [M---*]-----[M----------*]----------[M---
**results:** [M---*][M------*]--[M--*]-[M---*]-[M-----
frame: 3 2 1 2 1
Returns
dictionary with non-overlapping orfs
578 def calc_length_stats(self) -> dict: 579 """ 580 Determine the stats for the length of the ungapped seqs in the alignment. 581 :return: dictionary with length stats 582 """ 583 584 seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment] 585 586 return {'number of seq': len(self.alignment), 587 'mean length': float(np.mean(seq_lengths)), 588 'std length': float(np.std(seq_lengths)), 589 'min length': int(np.min(seq_lengths)), 590 'max length': int(np.max(seq_lengths)) 591 }
Determine the stats for the length of the ungapped seqs in the alignment.
Returns
dictionary with length stats
593 def calc_entropy(self) -> list: 594 """ 595 Calculate the normalized shannon's entropy for every position in an alignment: 596 597 - 1: high entropy 598 - 0: low entropy 599 600 :return: Entropies at each position. 601 """ 602 603 # helper functions 604 def shannons_entropy(character_list: list, states: int, aln_type: str) -> float: 605 """ 606 Calculate the shannon's entropy of a sequence and 607 normalized between 0 and 1. 608 :param character_list: characters at an alignment position 609 :param states: number of potential characters that can be present 610 :param aln_type: type of the alignment 611 :returns: entropy 612 """ 613 ent, n_chars = 0, len(character_list) 614 # only one char is in the list 615 if n_chars <= 1: 616 return ent 617 # calculate the number of unique chars and their counts 618 chars, char_counts = np.unique(character_list, return_counts=True) 619 char_counts = char_counts.astype(float) 620 # ignore gaps for entropy calc 621 char_counts, chars = char_counts[chars != "-"], chars[chars != "-"] 622 # correctly handle ambiguous chars 623 index_to_drop = [] 624 for index, char in enumerate(chars): 625 if char in config.AMBIG_CHARS[aln_type]: 626 index_to_drop.append(index) 627 amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True) 628 amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char]) 629 # add the proportionate numbers to initial array 630 for amb_char, amb_count in zip(amb_chars, amb_counts): 631 if amb_char in chars: 632 char_counts[chars == amb_char] += amb_count 633 else: 634 chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count) 635 # drop the ambiguous characters from array 636 char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop) 637 # calc the entropy 638 probs = char_counts / n_chars 639 if np.count_nonzero(probs) <= 1: 640 return ent 641 for prob in probs: 642 ent -= prob * math.log(prob, states) 643 644 return ent 645 646 aln = self.alignment 647 entropys = [] 648 649 if self.aln_type == 'AA': 650 states = 20 651 else: 652 states = 4 653 # iterate over alignment positions and the sequences 654 for nuc_pos in range(self.length): 655 pos = [] 656 for record in aln: 657 pos.append(aln[record][nuc_pos]) 658 entropys.append(shannons_entropy(pos, states, self.aln_type)) 659 660 return entropys
Calculate the normalized shannon's entropy for every position in an alignment:
- 1: high entropy
- 0: low entropy
Returns
Entropies at each position.
662 def calc_gc(self) -> list | TypeError: 663 """ 664 Determine the GC content for every position in an nt alignment. 665 :return: GC content for every position. 666 :raises: TypeError for AA alignments 667 """ 668 if self.aln_type == 'AA': 669 raise TypeError("GC computation is not possible for aminoacid alignment") 670 671 gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type] 672 673 for position in range(self.length): 674 nucleotides = str() 675 for record in aln: 676 nucleotides = nucleotides + aln[record][position] 677 # ini dict with chars that occur and which ones to 678 # count in which freq 679 to_count = { 680 'G': 1, 681 'C': 1, 682 } 683 # handle ambig. nuc 684 for char in amb_nucs: 685 if char in nucleotides: 686 to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char]) 687 688 gc.append( 689 sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides) 690 ) 691 692 return gc
Determine the GC content for every position in an nt alignment.
Returns
GC content for every position.
Raises
- TypeError for AA alignments
694 def calc_coverage(self) -> list: 695 """ 696 Determine the coverage of every position in an alignment. 697 This is defined as: 698 1 - cumulative length of '-' characters 699 700 :return: Coverage at each alignment position. 701 """ 702 coverage, aln = [], self.alignment 703 704 for nuc_pos in range(self.length): 705 pos = str() 706 for record in aln.keys(): 707 pos = pos + aln[record][nuc_pos] 708 coverage.append(1 - pos.count('-') / len(pos)) 709 710 return coverage
Determine the coverage of every position in an alignment. This is defined as: 1 - cumulative length of '-' characters
Returns
Coverage at each alignment position.
712 def calc_reverse_complement_alignment(self) -> dict | TypeError: 713 """ 714 Reverse complement the alignment. 715 :return: Alignment (rv) 716 """ 717 if self.aln_type == 'AA': 718 raise TypeError('Reverse complement only for RNA or DNA.') 719 720 aln = self.alignment 721 reverse_complement_dict = {} 722 723 for seq_id in aln: 724 reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id])) 725 726 return reverse_complement_dict
Reverse complement the alignment.
Returns
Alignment (rv)
728 def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False): 729 """ 730 Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the 731 remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard']. 732 733 :param encode_ambiguities: encode ambiguities as -2 734 :param encode_mask: encode mask with as -3 735 :returns matrix 736 """ 737 738 aln = self.alignment 739 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 740 # ini matrix 741 numerical_matrix = np.full(sequences.shape, np.nan, dtype=float) 742 # first encode mask 743 if encode_mask: 744 if self.aln_type == 'AA': 745 is_n_or_x = np.isin(sequences, ['X']) 746 else: 747 is_n_or_x = np.isin(sequences, ['N']) 748 numerical_matrix[is_n_or_x] = -2 749 # next encode ambig chars 750 if encode_ambiguities: 751 numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3 752 # next convert each char into their respective values 753 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 754 numerical_matrix[np.isin(sequences, [char])] = idx + 1 755 756 return numerical_matrix
Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard'].
Parameters
- encode_ambiguities: encode ambiguities as -2
- encode_mask: encode mask with as -3 :returns matrix
758 def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray: 759 """ 760 Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n 761 762 :param encode_mismatches: encode mismatch as -1 763 :param encode_mask: encode mask with value=-2 --> also in the reference 764 :param encode_gaps: encode gaps with np.nan --> also in the reference 765 :param encode_ambiguities: encode ambiguities with value=-3 766 :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard'] 767 :return: identity alignment 768 """ 769 770 aln = self.alignment 771 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 772 773 # convert alignment to array 774 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 775 reference = np.array(list(ref)) 776 # ini matrix 777 identity_matrix = np.full(sequences.shape, 0, dtype=float) 778 779 is_identical = sequences == reference 780 781 if encode_gaps: 782 is_gap = sequences == '-' 783 else: 784 is_gap = np.full(sequences.shape, False) 785 786 if encode_mask: 787 if self.aln_type == 'AA': 788 is_n_or_x = np.isin(sequences, ['X']) 789 else: 790 is_n_or_x = np.isin(sequences, ['N']) 791 else: 792 is_n_or_x = np.full(sequences.shape, False) 793 794 if encode_ambiguities: 795 is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']]) 796 else: 797 is_ambig = np.full(sequences.shape, False) 798 799 if encode_mismatches: 800 is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig 801 else: 802 is_mismatch = np.full(sequences.shape, False) 803 804 # encode every different character 805 if encode_each_mismatch_char: 806 for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']): 807 new_encoding = np.isin(sequences, [char]) & is_mismatch 808 identity_matrix[new_encoding] = idx + 1 809 # or encode different with a single value 810 else: 811 identity_matrix[is_mismatch] = -1 # mismatch 812 813 identity_matrix[is_gap] = np.nan # gap 814 identity_matrix[is_n_or_x] = -2 # 'N' or 'X' 815 identity_matrix[is_ambig] = -3 # ambiguities 816 817 return identity_matrix
Converts alignment to identity array (identical=0) compared to majority consensus or reference:
Parameters
- encode_mismatches: encode mismatch as -1
- encode_mask: encode mask with value=-2 --> also in the reference
- encode_gaps: encode gaps with np.nan --> also in the reference
- encode_ambiguities: encode ambiguities with value=-3
- encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard']
Returns
identity alignment
819 def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray: 820 """ 821 Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight 822 differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the 823 reference residue at each column. Gaps are encoded as np.nan. 824 825 The calculation follows these steps: 826 827 1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise, 828 a consensus sequence is generated to serve as the reference. 829 2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as 830 BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type. 831 3. **Per-Column Normalization (optional)**: 832 833 For each column in the alignment: 834 - The residue in the reference sequence is treated as the baseline for that column. 835 - The substitution scores for the reference residue are extracted from the substitution matrix. 836 - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue. 837 - This ensures that identical residues (or those with high similarity to the reference) have high scores, 838 while more dissimilar residues have lower scores. 839 4. **Output**: 840 841 - The normalized similarity scores are stored in a NumPy array. 842 - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`. 843 844 :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN) 845 :param: normalize: whether to normalize the similarity scores to range [0, 1] 846 :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue 847 and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity). 848 Gaps and invalid residues are encoded as `np.nan`. 849 :raise: ValueError 850 If the specified substitution matrix is not available for the given alignment type. 851 """ 852 853 aln = self.alignment 854 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 855 if matrix_type is None: 856 if self.aln_type == 'AA': 857 matrix_type = 'BLOSUM65' 858 else: 859 matrix_type = 'TRANS' 860 # load substitution matrix as dictionary 861 try: 862 subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type] 863 except KeyError: 864 raise ValueError( 865 f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}' 866 ) 867 868 # set dtype and convert alignment to a NumPy array for vectorized processing 869 dtype = np.dtype(float, metadata={'matrix': matrix_type}) 870 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 871 reference = np.array(list(ref)) 872 valid_chars = list(subs_matrix.keys()) 873 similarity_array = np.full(sequences.shape, np.nan, dtype=dtype) 874 875 for j, ref_char in enumerate(reference): 876 if ref_char not in valid_chars + ['-']: 877 continue 878 # Get local min and max for the reference residue 879 if normalize and ref_char != '-': 880 local_scores = subs_matrix[ref_char].values() 881 local_min, local_max = min(local_scores), max(local_scores) 882 883 for i, char in enumerate(sequences[:, j]): 884 if char not in valid_chars: 885 continue 886 # classify the similarity as max if the reference has a gap 887 similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1 888 similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score 889 890 return similarity_array
Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the reference residue at each column. Gaps are encoded as np.nan.
The calculation follows these steps:
- Reference Sequence: If a reference sequence is provided (via
self.reference_id), it is used. Otherwise, a consensus sequence is generated to serve as the reference. - Substitution Matrix: The similarity between residues is determined using a substitution matrix, such as BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type.
- Per-Column Normalization (optional):
For each column in the alignment: - The residue in the reference sequence is treated as the baseline for that column. - The substitution scores for the reference residue are extracted from the substitution matrix. - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue. - This ensures that identical residues (or those with high similarity to the reference) have high scores, while more dissimilar residues have lower scores. 4. Output:
- The normalized similarity scores are stored in a NumPy array.
- Gaps (if any) or residues not present in the substitution matrix are encoded as
np.nan.
Parameters
- matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN)
- normalize: whether to normalize the similarity scores to range [0, 1]
Returns
A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity). Gaps and invalid residues are encoded as
np.nan. :raise: ValueError If the specified substitution matrix is not available for the given alignment type.
892 def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError: 893 """ 894 Calculates a position matrix of the specified type for the given alignment. The function 895 supports generating matrices of types Position Frequency Matrix (PFM), Position Probability 896 Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates 897 the provided matrix type and includes pseudo-count adjustments to ensure robust calculations. 898 899 :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM', 900 'PWM', and 'IC'. Defaults to 'PWM'. 901 :type matrix_type: str 902 :raises ValueError: If the provided `matrix_type` is not one of the accepted values. 903 :return: A numpy array representing the calculated position matrix of the specified type. 904 :rtype: np.ndarray 905 """ 906 907 # ini 908 aln = self.alignment 909 if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']: 910 raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.') 911 possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys()) 912 sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())]) 913 914 # calc position frequency matrix 915 pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars]) 916 if matrix_type == 'PFM': 917 return pfm 918 919 # calc position probability matrix (probability) 920 pseudo_count = 0.0001 # to avoid 0 values 921 pfm = pfm + pseudo_count 922 ppm_non_char_excluded = pfm/np.sum(pfm, axis=0) # use this for pwm/ic calculation 923 ppm = pfm/len(aln.keys()) # calculate the frequency based on row number 924 if matrix_type == 'PPM': 925 return ppm 926 927 # calc position weight matrix (log-likelihood) 928 pwm = np.log2(ppm_non_char_excluded * len(possible_chars)) 929 if matrix_type == 'PWM': 930 return pwm 931 932 # calc information content per position (in bits) - can be used to scale a ppm for sequence logos 933 ic = np.sum(ppm_non_char_excluded * pwm, axis=0) 934 if matrix_type == 'IC': 935 return ic 936 937 return None
Calculates a position matrix of the specified type for the given alignment. The function supports generating matrices of types Position Frequency Matrix (PFM), Position Probability Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates the provided matrix type and includes pseudo-count adjustments to ensure robust calculations.
Parameters
- matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM', 'PWM', and 'IC'. Defaults to 'PWM'.
Raises
- ValueError: If the provided
matrix_typeis not one of the accepted values.
Returns
A numpy array representing the calculated position matrix of the specified type.
939 def calc_percent_recovery(self) -> dict: 940 """ 941 Recovery per sequence either compared to the majority consensus seq 942 or the reference seq.\n 943 Defined as:\n 944 945 `(1 - sum(N/X/- characters in ungapped ref regions))*100` 946 947 This is highly similar to how nextclade calculates recovery over reference. 948 949 :return: dict 950 """ 951 952 aln = self.alignment 953 954 if self.reference_id is not None: 955 ref = aln[self.reference_id] 956 else: 957 ref = self.get_consensus() # majority consensus 958 959 if not any(char != '-' for char in ref): 960 raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.") 961 962 963 # count 'N', 'X' and '-' chars in non-gapped regions 964 recovery_over_ref = dict() 965 966 # Get positions of non-gap characters in the reference 967 non_gap_positions = [i for i, char in enumerate(ref) if char != '-'] 968 cumulative_length = len(non_gap_positions) 969 970 # Calculate recovery 971 for seq_id in aln: 972 if seq_id == self.reference_id: 973 continue 974 seq = aln[seq_id] 975 count_invalid = sum( 976 seq[pos] == '-' or 977 (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N') 978 for pos in non_gap_positions 979 ) 980 recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100 981 982 return recovery_over_ref
Recovery per sequence either compared to the majority consensus seq or the reference seq.
Defined as:
(1 - sum(N/X/- characters in ungapped ref regions))*100
This is highly similar to how nextclade calculates recovery over reference.
Returns
dict
984 def calc_character_frequencies(self) -> dict: 985 """ 986 Calculate the percentage characters in the alignment: 987 The frequencies are counted by seq and in total. The 988 percentage of non-gap characters in the alignment is 989 relative to the total number of non-gap characters. 990 The gap percentage is relative to the sequence length. 991 992 The output is a nested dictionary. 993 994 :return: Character frequencies 995 """ 996 997 aln, aln_length = self.alignment, self.length 998 999 freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}} 1000 1001 for seq_id in aln: 1002 freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0 1003 unique_chars = set(aln[seq_id]) 1004 for char in unique_chars: 1005 if char == '-': 1006 continue 1007 # add characters to dictionaries 1008 if char not in freqs[seq_id]: 1009 freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0} 1010 if char not in freqs['total']: 1011 freqs['total'][char] = {'counts': 0, '% of non-gapped': 0} 1012 # count non-gap chars 1013 freqs[seq_id][char]['counts'] += aln[seq_id].count(char) 1014 freqs['total'][char]['counts'] += freqs[seq_id][char]['counts'] 1015 all_chars += freqs[seq_id][char]['counts'] 1016 # normalize counts 1017 for char in freqs[seq_id]: 1018 if char == '-': 1019 continue 1020 freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100 1021 freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped'] 1022 # count gaps 1023 freqs[seq_id]['-']['counts'] = aln[seq_id].count('-') 1024 freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts'] 1025 # normalize gap counts 1026 freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100 1027 freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment'] 1028 1029 # normalize the total counts 1030 for char in freqs['total']: 1031 for value in freqs['total'][char]: 1032 if value == '% of alignment' or value == '% of non-gapped': 1033 freqs['total'][char][value] = freqs['total'][char][value] / len(aln) 1034 1035 return freqs
Calculate the percentage characters in the alignment: The frequencies are counted by seq and in total. The percentage of non-gap characters in the alignment is relative to the total number of non-gap characters. The gap percentage is relative to the sequence length.
The output is a nested dictionary.
Returns
Character frequencies
1037 def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray: 1038 """ 1039 Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented: 1040 1041 **1) ghd (global hamming distance)**: At each alignment position, check if characters match: 1042 \ndistance = matches / alignment_length * 100 1043 1044 **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps: 1045 \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100 1046 1047 **3) ged (gap excluded distance)**: All gaps are excluded from the alignment 1048 \ndistance = matches / (matches + mismatches) * 100 1049 1050 **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch. 1051 \ndistance = matches / gap_compressed_alignment_length * 100 1052 1053 :return: array with pairwise distances. 1054 """ 1055 1056 def hamming_distance(seq1: str, seq2: str) -> int: 1057 return sum(c1 == c2 for c1, c2 in zip(seq1, seq2)) 1058 1059 def ghd(seq1: str, seq2: str) -> float: 1060 return hamming_distance(seq1, seq2) / self.length * 100 1061 1062 def lhd(seq1, seq2): 1063 # Trim gaps from both sides 1064 i, j = 0, self.length - 1 1065 while i < self.length and (seq1[i] == '-' or seq2[i] == '-'): 1066 i += 1 1067 while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'): 1068 j -= 1 1069 if i > j: 1070 return 0.0 1071 1072 seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1] 1073 matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_)) 1074 length = j - i + 1 1075 return (matches / length) * 100 if length > 0 else 0.0 1076 1077 def ged(seq1: str, seq2: str) -> float: 1078 1079 matches, mismatches = 0, 0 1080 1081 for c1, c2 in zip(seq1, seq2): 1082 if c1 != '-' and c2 != '-': 1083 if c1 == c2: 1084 matches += 1 1085 else: 1086 mismatches += 1 1087 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1088 1089 def gcd(seq1: str, seq2: str) -> float: 1090 matches = 0 1091 mismatches = 0 1092 in_gap = False 1093 1094 for char1, char2 in zip(seq1, seq2): 1095 if char1 == '-' and char2 == '-': # Shared gap: do nothing 1096 continue 1097 elif char1 == '-' or char2 == '-': # Gap in only one sequence 1098 if not in_gap: # Start of a new gap stretch 1099 mismatches += 1 1100 in_gap = True 1101 else: # No gaps 1102 in_gap = False 1103 if char1 == char2: # Matching characters 1104 matches += 1 1105 else: # Mismatched characters 1106 mismatches += 1 1107 1108 return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0 1109 1110 1111 # Map distance type to corresponding function 1112 distance_functions: Dict[str, Callable[[str, str], float]] = { 1113 'ghd': ghd, 1114 'lhd': lhd, 1115 'ged': ged, 1116 'gcd': gcd 1117 } 1118 1119 if distance_type not in distance_functions: 1120 raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.") 1121 1122 # Compute pairwise distances 1123 aln = self.alignment 1124 distance_func = distance_functions[distance_type] 1125 distance_matrix = np.zeros((len(aln), len(aln))) 1126 1127 sequences = list(aln.values()) 1128 n = len(sequences) 1129 for i in range(n): 1130 seq1 = sequences[i] 1131 for j in range(i, n): 1132 seq2 = sequences[j] 1133 dist = distance_func(seq1, seq2) 1134 distance_matrix[i, j] = dist 1135 distance_matrix[j, i] = dist 1136 1137 return distance_matrix
Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented:
**1) ghd (global hamming distance)**: At each alignment position, check if characters match:
distance = matches / alignment_length * 100
**2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps:
distance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100
**3) ged (gap excluded distance)**: All gaps are excluded from the alignment
distance = matches / (matches + mismatches) * 100
**4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch.
distance = matches / gap_compressed_alignment_length * 100
:return: array with pairwise distances.
1139 def get_snps(self, include_ambig:bool=False) -> dict: 1140 """ 1141 Calculate snps similar to snp-sites (output is comparable): 1142 https://github.com/sanger-pathogens/snp-sites 1143 Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character. 1144 The SNPs are compared to a majority consensus sequence or to a reference if it has been set. 1145 1146 :param include_ambig: Include ambiguous snps (default: False) 1147 :return: dictionary containing snp positions and their variants including their frequency. 1148 """ 1149 aln = self.alignment 1150 ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus() 1151 aln = {x: aln[x] for x in aln.keys() if x != self.reference_id} 1152 seq_ids = list(aln.keys()) 1153 snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}} 1154 1155 for pos in range(self.length): 1156 reference_char = ref[pos] 1157 if not include_ambig: 1158 if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-': 1159 continue 1160 alt_chars, snps = [], [] 1161 for i, seq_id in enumerate(aln.keys()): 1162 alt_chars.append(aln[seq_id][pos]) 1163 if reference_char != aln[seq_id][pos]: 1164 snps.append(i) 1165 if not snps: 1166 continue 1167 if include_ambig: 1168 if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps): 1169 continue 1170 else: 1171 snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]] 1172 if not snps: 1173 continue 1174 if pos not in snp_dict: 1175 snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}} 1176 for snp in snps: 1177 if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']: 1178 snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = { 1179 'AF': 1, 1180 'SEQ_ID': [seq_ids[snp]] 1181 } 1182 else: 1183 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1 1184 snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp]) 1185 # calculate AF 1186 if pos in snp_dict['POS']: 1187 for alt in snp_dict['POS'][pos]['ALT']: 1188 snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln) 1189 1190 return snp_dict
Calculate snps similar to snp-sites (output is comparable): https://github.com/sanger-pathogens/snp-sites Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character. The SNPs are compared to a majority consensus sequence or to a reference if it has been set.
Parameters
- include_ambig: Include ambiguous snps (default: False)
Returns
dictionary containing snp positions and their variants including their frequency.
1192 def calc_transition_transversion_score(self) -> list: 1193 """ 1194 Based on the snp positions, calculates a transition/transversions score. 1195 A positive score means higher ratio of transitions and negative score means 1196 a higher ratio of transversions. 1197 :return: list 1198 """ 1199 1200 if self.aln_type == 'AA': 1201 raise TypeError('TS/TV scoring only for RNA/DNA alignments') 1202 1203 # ini 1204 snps = self.get_snps() 1205 score = [0]*self.length 1206 1207 for pos in snps['POS']: 1208 t_score_temp = 0 1209 for alt in snps['POS'][pos]['ALT']: 1210 # check the type of substitution 1211 if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']: 1212 score[pos] += snps['POS'][pos]['ALT'][alt]['AF'] 1213 else: 1214 score[pos] -= snps['POS'][pos]['ALT'][alt]['AF'] 1215 1216 return score
Based on the snp positions, calculates a transition/transversions score. A positive score means higher ratio of transitions and negative score means a higher ratio of transversions.
Returns
list
1219class Annotation: 1220 """ 1221 An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA. 1222 """ 1223 1224 def __init__(self, aln: MSA, annotation_path: str): 1225 """ 1226 The annotation class. Lets you parse multiple standard formats 1227 which might be used for annotating an alignment. The main purpose 1228 is to parse the annotation file and adapt the locations of diverse 1229 features to the locations within the alignment, considering the 1230 respective alignment positions. Importantly, IDs of the alignment 1231 and the MSA have to partly match. 1232 1233 :param aln: MSA class 1234 :param annotation_path: path to annotation file (gb, bed, gff) or raw string 1235 1236 """ 1237 1238 self.ann_type, self._seq_id, self.locus, self.features = self._parse_annotation(annotation_path, aln) # read annotation 1239 self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id) # extract gapped sequence 1240 self._position_map = self._build_position_map() # build a position map 1241 self._map_to_alignment() # adapt feature locations 1242 1243 @staticmethod 1244 def _MSA_validation_and_seq_extraction(aln: MSA, seq_id: str) -> str: 1245 """ 1246 extract gapped sequence from MSA that corresponds to annotation 1247 :param aln: MSA class 1248 :param seq_id: sequence id to extract 1249 :return: gapped sequence 1250 """ 1251 if not isinstance(aln, MSA): 1252 raise ValueError('alignment has to be an MSA class. use explore.MSA() to read in alignment') 1253 else: 1254 return aln._alignment[seq_id] 1255 1256 @staticmethod 1257 def _parse_annotation(annotation_path: str, aln: MSA) -> tuple[str, str, str, Dict]: 1258 1259 def detect_annotation_type(file_path: str) -> str: 1260 """ 1261 Detect the type of annotation file (GenBank, GFF, or BED) based 1262 on the first relevant line (excluding empty and #) 1263 1264 :param file_path: Path to the annotation file. 1265 :return: The detected file type ('gb', 'gff', or 'bed'). 1266 1267 :raises ValueError: If the file type cannot be determined. 1268 """ 1269 1270 with _get_line_iterator(file_path) as file: 1271 for line in file: 1272 # skip empty lines and comments 1273 if not line.strip() or line.startswith('#'): 1274 continue 1275 # genbank 1276 if line.startswith('LOCUS'): 1277 return 'gb' 1278 # gff 1279 if len(line.split('\t')) == 9: 1280 # Check for expected values 1281 columns = line.split('\t') 1282 if columns[6] in ['+', '-', '.'] and re.match(r'^\d+$', columns[3]) and re.match(r'^\d+$',columns[4]): 1283 return 'gff' 1284 # BED files are tab-delimited with at least 3 fields: chrom, start, end 1285 fields = line.split('\t') 1286 if len(fields) >= 3 and re.match(r'^\d+$', fields[1]) and re.match(r'^\d+$', fields[2]): 1287 return 'bed' 1288 # only read in the first line 1289 break 1290 1291 raise ValueError( 1292 "File type could not be determined. Ensure the file follows a recognized format (GenBank, GFF, or BED).") 1293 1294 def parse_gb(file_path) -> dict: 1295 """ 1296 parse a genebank file to dictionary - primarily retained are the informations 1297 for qualifiers as these will be used for plotting. 1298 1299 :param file_path: path to genebank file 1300 :return: nested dictionary 1301 1302 """ 1303 1304 def sanitize_gb_location(string: str) -> tuple[list, str]: 1305 """ 1306 see: https://www.insdc.org/submitting-standards/feature-table/ 1307 """ 1308 strand = '+' 1309 locations = [] 1310 # check the direction of the annotation 1311 if 'complement' in string: 1312 strand = '-' 1313 # sanitize operators 1314 for operator in ['complement(', 'join(', 'order(']: 1315 string = string.replace(operator, '') 1316 # sanitize possible chars for splitting start stop - 1317 # however in the future might not simply do this 1318 # as some useful information is retained 1319 for char in ['>', '<', ')']: 1320 string = string.replace(char, '') 1321 # check if we have multiple location e.g. due to splicing 1322 if ',' in string: 1323 raw_locations = string.split(',') 1324 else: 1325 raw_locations = [string] 1326 # try to split start and stop 1327 for location in raw_locations: 1328 for sep in ['..', '.', '^']: 1329 if sep in location: 1330 sanitized_locations = [int(x) for x in location.split(sep)] 1331 sanitized_locations[0] = sanitized_locations[0] - 1 # enforce 0-based starts 1332 locations.append(sanitized_locations) 1333 break 1334 1335 return locations, strand 1336 1337 1338 records = {} 1339 with _get_line_iterator(file_path) as file: 1340 record = None 1341 in_features = False 1342 counter_dict = {} 1343 for line in file: 1344 line = line.rstrip() 1345 parts = line.split() 1346 # extract the locus id 1347 if line.startswith('LOCUS'): 1348 if record: 1349 records[record['locus']] = record 1350 record = { 1351 'locus': parts[1], 1352 'features': {} 1353 } 1354 1355 elif line.startswith('FEATURES'): 1356 in_features = True 1357 1358 # ignore the sequence info 1359 elif line.startswith('ORIGIN'): 1360 in_features = False 1361 1362 # now write useful feature information to dictionary 1363 elif in_features: 1364 if not line.strip(): 1365 continue 1366 if line[5] != ' ': 1367 location_line = True # remember that we are in a location for multi-line locations 1368 feature_type, qualifier = parts[0], parts[1] 1369 if feature_type not in record['features']: 1370 record['features'][feature_type] = {} 1371 counter_dict[feature_type] = 0 1372 locations, strand = sanitize_gb_location(qualifier) 1373 record['features'][feature_type][counter_dict[feature_type]] = { 1374 'location': locations, 1375 'strand': strand 1376 } 1377 counter_dict[feature_type] += 1 1378 else: 1379 # edge case for multi-line locations 1380 if location_line and not line.strip().startswith('/'): 1381 locations, strand = sanitize_gb_location(parts[0]) 1382 for loc in locations: 1383 record['features'][feature_type][counter_dict[feature_type]]['location'].append(loc) 1384 else: 1385 location_line = False 1386 try: 1387 qualifier_type, qualifier = parts[0].split('=') 1388 except ValueError: # we are in the coding sequence 1389 qualifier = qualifier + parts[0] 1390 1391 qualifier_type, qualifier = qualifier_type.lstrip('/'), qualifier.strip('"') 1392 last_index = counter_dict[feature_type] - 1 1393 record['features'][feature_type][last_index][qualifier_type] = qualifier 1394 1395 records[record['locus']] = record 1396 1397 return records 1398 1399 def parse_gff(file_path) -> dict: 1400 """ 1401 Parse a GFF3 (General Feature Format) file into a dictionary structure. 1402 1403 :param file_path: path to genebank file 1404 :return: nested dictionary 1405 1406 """ 1407 records = {} 1408 with _get_line_iterator(file_path) as file: 1409 previous_id, previous_feature = None, None 1410 for line in file: 1411 if line.startswith('#') or not line.strip(): 1412 continue 1413 parts = line.strip().split('\t') 1414 seqid, source, feature_type, start, end, score, strand, phase, attributes = parts 1415 # ensure that region and source features are not named differently for gff and gb 1416 if feature_type == 'region': 1417 feature_type = 'source' 1418 if seqid not in records: 1419 records[seqid] = {'locus': seqid, 'features': {}} 1420 if feature_type not in records[seqid]['features']: 1421 records[seqid]['features'][feature_type] = {} 1422 1423 feature_id = len(records[seqid]['features'][feature_type]) 1424 feature = { 1425 'strand': strand, 1426 } 1427 1428 # Parse attributes into key-value pairs 1429 for attr in attributes.split(';'): 1430 if '=' in attr: 1431 key, value = attr.split('=', 1) 1432 feature[key.strip()] = value.strip() 1433 1434 # check if feature are the same --> possible splicing 1435 if previous_id is not None and previous_feature == feature: 1436 records[seqid]['features'][feature_type][previous_id]['location'].append([int(start)-1, int(end)]) 1437 else: 1438 records[seqid]['features'][feature_type][feature_id] = feature 1439 records[seqid]['features'][feature_type][feature_id]['location'] = [[int(start) - 1, int(end)]] 1440 # set new previous id and features -> new dict as 'location' is pointed in current feature and this 1441 # is the only key different if next feature has the same entries 1442 previous_id, previous_feature = feature_id, {key:value for key, value in feature.items() if key != 'location'} 1443 1444 return records 1445 1446 def parse_bed(file_path) -> dict: 1447 """ 1448 Parse a BED file into a dictionary structure. 1449 1450 :param file_path: path to genebank file 1451 :return: nested dictionary 1452 1453 """ 1454 records = {} 1455 with _get_line_iterator(file_path) as file: 1456 for line in file: 1457 if line.startswith('#') or not line.strip(): 1458 continue 1459 parts = line.strip().split('\t') 1460 chrom, start, end, *optional = parts 1461 1462 if chrom not in records: 1463 records[chrom] = {'locus': chrom, 'features': {}} 1464 feature_type = 'region' 1465 if feature_type not in records[chrom]['features']: 1466 records[chrom]['features'][feature_type] = {} 1467 1468 feature_id = len(records[chrom]['features'][feature_type]) 1469 feature = { 1470 'location': [[int(start), int(end)]], # BED uses 0-based start, convert to 1-based 1471 'strand': '+', # assume '+' if not present 1472 } 1473 1474 # Handle optional columns (name, score, strand) --> ignore 7-12 1475 if len(optional) >= 1: 1476 feature['name'] = optional[0] 1477 if len(optional) >= 2: 1478 feature['score'] = optional[1] 1479 if len(optional) >= 3: 1480 feature['strand'] = optional[2] 1481 1482 records[chrom]['features'][feature_type][feature_id] = feature 1483 1484 return records 1485 1486 parse_functions: Dict[str, Callable[[str], dict]] = { 1487 'gb': parse_gb, 1488 'bed': parse_bed, 1489 'gff': parse_gff, 1490 } 1491 # determine the annotation content -> should be standard formatted 1492 try: 1493 annotation_type = detect_annotation_type(annotation_path) 1494 except ValueError as err: 1495 raise err 1496 1497 # read in the annotation 1498 annotations = parse_functions[annotation_type](annotation_path) 1499 1500 # sanity check whether one of the annotation ids and alignment ids match 1501 annotation_found = False 1502 for annotation in annotations.keys(): 1503 for aln_id in aln.alignment.keys(): 1504 aln_id_sanitized = aln_id.split(' ')[0] 1505 # check in both directions 1506 if aln_id_sanitized in annotation: 1507 annotation_found = True 1508 break 1509 if annotation in aln_id_sanitized: 1510 annotation_found = True 1511 break 1512 1513 if not annotation_found: 1514 raise ValueError(f'the annotations of {annotation_path} do not match any ids in the MSA') 1515 1516 # return only the annotation that has been found, the respective type and the seq_id to map to 1517 return annotation_type, aln_id, annotations[annotation]['locus'], annotations[annotation]['features'] 1518 1519 1520 def _build_position_map(self) -> Dict[int, int]: 1521 """ 1522 build a position map from a sequence. 1523 1524 :return genomic position: gapped position 1525 """ 1526 1527 position_map = {} 1528 genomic_pos = 0 1529 for aln_index, char in enumerate(self._gapped_seq): 1530 if char != '-': 1531 position_map[genomic_pos] = aln_index 1532 genomic_pos += 1 1533 # ensure the last genomic position is included 1534 position_map[genomic_pos] = len(self._gapped_seq) 1535 1536 return position_map 1537 1538 1539 def _map_to_alignment(self): 1540 """ 1541 Adjust all feature locations to alignment positions 1542 """ 1543 1544 def map_location(position_map: Dict[int, int], locations: list) -> list: 1545 """ 1546 Map genomic locations to alignment positions using a precomputed position map. 1547 1548 :param position_map: Positions mapped from gapped to ungapped 1549 :param locations: List of genomic start and end positions. 1550 :return: List of adjusted alignment positions. 1551 """ 1552 1553 aligned_locs = [] 1554 for start, end in locations: 1555 try: 1556 aligned_start = position_map[start] 1557 aligned_end = position_map[end] 1558 aligned_locs.append([aligned_start, aligned_end]) 1559 except KeyError: 1560 raise ValueError(f"Positions {start}-{end} lie outside of the position map.") 1561 1562 return aligned_locs 1563 1564 for feature_type, features in self.features.items(): 1565 for feature_id, feature_data in features.items(): 1566 original_locations = feature_data['location'] 1567 aligned_locations = map_location(self._position_map, original_locations) 1568 feature_data['location'] = aligned_locations
An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA.
1224 def __init__(self, aln: MSA, annotation_path: str): 1225 """ 1226 The annotation class. Lets you parse multiple standard formats 1227 which might be used for annotating an alignment. The main purpose 1228 is to parse the annotation file and adapt the locations of diverse 1229 features to the locations within the alignment, considering the 1230 respective alignment positions. Importantly, IDs of the alignment 1231 and the MSA have to partly match. 1232 1233 :param aln: MSA class 1234 :param annotation_path: path to annotation file (gb, bed, gff) or raw string 1235 1236 """ 1237 1238 self.ann_type, self._seq_id, self.locus, self.features = self._parse_annotation(annotation_path, aln) # read annotation 1239 self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id) # extract gapped sequence 1240 self._position_map = self._build_position_map() # build a position map 1241 self._map_to_alignment() # adapt feature locations
The annotation class. Lets you parse multiple standard formats which might be used for annotating an alignment. The main purpose is to parse the annotation file and adapt the locations of diverse features to the locations within the alignment, considering the respective alignment positions. Importantly, IDs of the alignment and the MSA have to partly match.
Parameters
- aln: MSA class
- annotation_path: path to annotation file (gb, bed, gff) or raw string