msaexplorer.explore

Explore module

This module contains the two classes MSA and Annotation which are used to read in the respective files and can be used to compute several statistics or be used as the input for the draw module functions.

Classes

   1"""
   2# Explore module
   3
   4This module contains the two classes `MSA` and `Annotation` which are used to read in the respective files
   5and can be used to compute several statistics or be used as the input for the `draw` module functions.
   6
   7## Classes
   8
   9"""
  10
  11# built-in
  12import os
  13import io
  14import math
  15import collections
  16import re
  17from typing import Callable, Dict
  18
  19# installed
  20import numpy as np
  21from numpy import ndarray
  22
  23# msaexplorer
  24from msaexplorer import config
  25
  26
  27def _get_line_iterator(source):
  28    """
  29    allow reading in both raw string or paths
  30    """
  31    if isinstance(source, str) and os.path.exists(source):
  32        return open(source, 'r')
  33    else:
  34        return io.StringIO(source)
  35
  36class MSA:
  37    """
  38    An alignment class that allows computation of several stats
  39    """
  40
  41    def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None):
  42        """
  43        Initialise an Alignment object.
  44        :param alignment_string: Path to alignment file or raw alignment string
  45        :param reference_id: reference id
  46        :param zoom_range: start and stop positions to zoom into the alignment
  47        """
  48        self._alignment = self._read_alignment(alignment_string)
  49        self._reference_id = self._validate_ref(reference_id, self._alignment)
  50        self._zoom = self._validate_zoom(zoom_range, self._alignment)
  51        self._aln_type = self._determine_aln_type(self._alignment)
  52
  53    # TODO: read in different alignment types
  54    # Static methods
  55    @staticmethod
  56    def _read_alignment(file_path: str) -> dict:
  57        """
  58        Parse MSA alignment file.
  59        :param file_path: path to alignment file
  60        :return: dictionary with ids as keys and sequences as values
  61        """
  62
  63        def add_seq(aln: dict, sequence_id: str, seq_list: list):
  64            """
  65            Add a complete sequence and check for non-allowed chars
  66            :param aln: alignment dictionary to build
  67            :param sequence_id: sequence id to add
  68            :param seq_list: sequences to add
  69            :return: alignment with added sequences
  70            """
  71            final_seq = ''.join(seq_list).upper()
  72            # Check for non-allowed characters
  73            invalid_chars = set(final_seq) - set(config.POSSIBLE_CHARS)
  74            if invalid_chars:
  75                raise ValueError(
  76                    f"{sequence_id} contains invalid characters: {', '.join(invalid_chars)}. Allowed chars are: {config.POSSIBLE_CHARS}"
  77                )
  78            aln[sequence_id] = final_seq
  79
  80            return aln
  81
  82        alignment, seq_lines = {}, []
  83        seq_id = None
  84
  85        with _get_line_iterator(file_path) as file:
  86            for i, line in enumerate(file):
  87                line = line.strip()
  88                # initial check for fasta format
  89                if i == 0 and not line.startswith(">"):
  90                    raise ValueError('Alignment has to be in fasta format starting with >SeqID.')
  91                if line.startswith(">"):
  92                    if seq_id:
  93                        alignment = add_seq(alignment, seq_id, seq_lines)
  94                    # initialize a new sequence
  95                    seq_id, seq_lines = line[1:], []
  96                else:
  97                    seq_lines.append(line)
  98            # handle last sequence
  99            if seq_id:
 100                alignment = add_seq(alignment, seq_id, seq_lines)
 101        # final sanity checks
 102        if alignment:
 103            # alignment contains only one sequence:
 104            if len(alignment) < 2:
 105                raise ValueError("Alignment must contain more than one sequence.")
 106            # alignment sequences are not same length
 107            first_seq_len = len(next(iter(alignment.values())))
 108            for sequence_id, sequence in alignment.items():
 109                if len(sequence) != first_seq_len:
 110                    raise ValueError(
 111                        f"All alignment sequences must have the same length. Sequence '{sequence_id}' has length {len(sequence)}, expected {first_seq_len}."
 112                    )
 113            # all checks passed
 114            return alignment
 115        else:
 116            raise ValueError(f"Alignment file {file_path} does not contain any sequences in fasta format.")
 117
 118    @staticmethod
 119    def _validate_ref(reference: str | None, alignment: dict) -> str | None | ValueError:
 120        """
 121        Validate if the ref seq is indeed part of the alignment.
 122        :param reference: reference seq id
 123        :param alignment: alignment dict
 124        :return: validated reference
 125        """
 126        if reference in alignment.keys():
 127            return reference
 128        elif reference is None:
 129            return reference
 130        else:
 131            raise ValueError('Reference not in alignment.')
 132
 133    @staticmethod
 134    def _validate_zoom(zoom: tuple | int, original_aln: dict) -> ValueError | tuple | None:
 135        """
 136        Validates if the user defined zoom range is within the start, end of the initial
 137        alignment.\n
 138        :param zoom: zoom range or zoom start
 139        :param original_aln: non-zoomed alignment dict
 140        :return: validated zoom range
 141        """
 142        if zoom is not None:
 143            aln_length = len(original_aln[list(original_aln.keys())[0]])
 144            # check if only over value is provided -> stop is alignment length
 145            if isinstance(zoom, int):
 146                if 0 <= zoom < aln_length:
 147                    return zoom, aln_length - 1
 148                else:
 149                    raise ValueError('Zoom start must be within the alignment length range.')
 150            # check if more than 2 values are provided
 151            if len(zoom) != 2:
 152                raise ValueError('Zoom position have to be (zoom_start, zoom_end)')
 153            # validate zoom start/stop
 154            for position in zoom:
 155                if type(position) != int:
 156                    raise ValueError('Zoom positions have to be integers.')
 157                if position not in range(0, aln_length):
 158                    raise ValueError('Zoom position out of range')
 159
 160        return zoom
 161
 162    @staticmethod
 163    def _determine_aln_type(alignment) -> str:
 164        """
 165        Determine the most likely type of alignment
 166        if 70% of chars in the alignment are nucleotide
 167        chars it is most likely a nt alignment
 168        :return: type of alignment
 169        """
 170        counter = int()
 171        for record in alignment:
 172            if 'U' in alignment[record]:
 173                return 'RNA'
 174            counter += sum(map(alignment[record].count, ['A', 'C', 'G', 'T', 'N', '-']))
 175        # determine which is the most likely type
 176        if counter / len(alignment) >= 0.7 * len(alignment[list(alignment.keys())[0]]):
 177            return 'DNA'
 178        else:
 179            return 'AA'
 180
 181    # Properties with setters
 182    @property
 183    def reference_id(self):
 184        return self._reference_id
 185
 186    @reference_id.setter
 187    def reference_id(self, ref_id: str):
 188        """
 189        Set and validate the reference id.
 190        """
 191        self._reference_id = self._validate_ref(ref_id, self.alignment)
 192
 193    @property
 194    def zoom(self) -> tuple:
 195        return self._zoom
 196
 197    @zoom.setter
 198    def zoom(self, zoom_pos: tuple | int):
 199        """
 200        Validate if the user defined zoom range.
 201        """
 202        self._zoom = self._validate_zoom(zoom_pos, self._alignment)
 203
 204    # Property without setters
 205    @property
 206    def aln_type(self) -> str:
 207        """
 208        define the aln type:
 209        RNA, DNA or AA
 210        """
 211        return self._aln_type
 212
 213    # On the fly properties without setters
 214    @property
 215    def length(self) -> int:
 216        return len(next(iter(self.alignment.values())))
 217
 218    @property
 219    def alignment(self) -> dict:
 220        """
 221        (zoomed) version of the alignment.
 222        """
 223        if self.zoom is not None:
 224            zoomed_aln = dict()
 225            for seq in self._alignment:
 226                zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]]
 227            return zoomed_aln
 228        else:
 229            return self._alignment
 230
 231    # functions for different alignment stats
 232    def get_reference_coords(self) -> tuple[int, int]:
 233        """
 234        Determine the start and end coordinates of the reference sequence
 235        defined as the first/last nucleotide in the reference sequence
 236        (excluding N and gaps).
 237
 238        :return: start, end
 239        """
 240        start, end = 0, self.length
 241
 242        if self.reference_id is None:
 243            return start, end
 244        else:
 245            # 5' --> 3'
 246            for start in range(self.length):
 247                if self.alignment[self.reference_id][start] not in ['-', 'N']:
 248                    break
 249            # 3' --> 5'
 250            for end in range(self.length - 1, 0, -1):
 251                if self.alignment[self.reference_id][end] not in ['-', 'N']:
 252                    break
 253
 254            return start, end
 255
 256    def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str:
 257        """
 258        Creates a non-gapped consensus sequence.
 259
 260        :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes
 261            the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments)
 262            or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold.
 263        :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position
 264            has a frequency above the defined threshold.
 265        :return: consensus sequence
 266        """
 267
 268        # helper functions
 269        def determine_counts(alignment_dict: dict, position: int) -> dict:
 270            """
 271            count the number of each char at
 272            an idx of the alignment. return sorted dic.
 273            handles ambiguous nucleotides in sequences.
 274            also handles gaps.
 275            """
 276            nucleotide_list = []
 277
 278            # get all nucleotides
 279            for sequence in alignment_dict.items():
 280                nucleotide_list.append(sequence[1][position])
 281            # count occurences of nucleotides
 282            counter = dict(collections.Counter(nucleotide_list))
 283            # get permutations of an ambiguous nucleotide
 284            to_delete = []
 285            temp_dict = {}
 286            for nucleotide in counter:
 287                if nucleotide in config.AMBIG_CHARS[self.aln_type]:
 288                    to_delete.append(nucleotide)
 289                    permutations = config.AMBIG_CHARS[self.aln_type][nucleotide]
 290                    adjusted_freq = 1 / len(permutations)
 291                    for permutation in permutations:
 292                        if permutation in temp_dict:
 293                            temp_dict[permutation] += adjusted_freq
 294                        else:
 295                            temp_dict[permutation] = adjusted_freq
 296
 297            # drop ambiguous entries and add adjusted freqs to
 298            if to_delete:
 299                for i in to_delete:
 300                    counter.pop(i)
 301                for nucleotide in temp_dict:
 302                    if nucleotide in counter:
 303                        counter[nucleotide] += temp_dict[nucleotide]
 304                    else:
 305                        counter[nucleotide] = temp_dict[nucleotide]
 306
 307            return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True))
 308
 309        def get_consensus_char(counts: dict, cutoff: float) -> list:
 310            """
 311            get a list of nucleotides for the consensus seq
 312            """
 313            n = 0
 314
 315            consensus_chars = []
 316            for char in counts:
 317                n += counts[char]
 318                consensus_chars.append(char)
 319                if n >= cutoff:
 320                    break
 321
 322            return consensus_chars
 323
 324        def get_ambiguous_char(nucleotides: list) -> str:
 325            """
 326            get ambiguous char from a list of nucleotides
 327            """
 328            for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items():
 329                if set(permutations) == set(nucleotides):
 330                    break
 331
 332            return ambiguous
 333
 334        # check if params have been set correctly
 335        if threshold is not None:
 336            if threshold < 0 or threshold > 1:
 337                raise ValueError('Threshold must be between 0 and 1.')
 338        if self.aln_type == 'AA' and use_ambig_nt:
 339            raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.')
 340        if threshold is None and use_ambig_nt:
 341            raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.')
 342
 343        alignment = self.alignment
 344        consensus = str()
 345
 346        if threshold is not None:
 347            consensus_cutoff = len(alignment) * threshold
 348        else:
 349            consensus_cutoff = 0
 350
 351        # built consensus sequences
 352        for idx in range(self.length):
 353            char_counts = determine_counts(alignment, idx)
 354            consensus_chars = get_consensus_char(
 355                char_counts,
 356                consensus_cutoff
 357            )
 358            if threshold != 0:
 359                if len(consensus_chars) > 1:
 360                    if use_ambig_nt:
 361                        char = get_ambiguous_char(consensus_chars)
 362                    else:
 363                        if self.aln_type == 'AA':
 364                            char = 'X'
 365                        else:
 366                            char = 'N'
 367                    consensus = consensus + char
 368                else:
 369                    consensus = consensus + consensus_chars[0]
 370            else:
 371                consensus = consensus + consensus_chars[0]
 372
 373        return consensus
 374
 375    def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict:
 376        """
 377        **conserved ORF definition:**
 378            - conserved starts and stops
 379            - start, stop must be on the same frame
 380            - stop - start must be at least min_length
 381            - all ungapped seqs[start:stop] must have at least min_length
 382            - no ungapped seq can have a Stop in between Start Stop
 383
 384        Conservation is measured by number of positions with identical characters divided by
 385        orf slice of the alignment.
 386
 387        **Algorithm overview:**
 388            - check for conserved start and stop codons
 389            - iterate over all three frames
 390            - check each start and next sufficiently far away stop codon
 391            - check if all ungapped seqs between start and stop codon are >= min_length
 392            - check if no ungapped seq in the alignment has a stop codon
 393            - write to dictionary
 394            - classify as internal if the stop codon has already been written with a prior start
 395            - repeat for reverse complement
 396
 397        :return: ORF positions and internal ORF positions
 398        """
 399
 400        # helper functions
 401        def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple:
 402            """
 403            Determine all start and stop codons within an alignment.
 404            :param alignment: alignment
 405            :param alignment_length: length of alignment
 406            :return: start and stop codon positions
 407            """
 408            starts = config.START_CODONS[self.aln_type]
 409            stops = config.STOP_CODONS[self.aln_type]
 410
 411            list_of_starts, list_of_stops = [], []
 412            ref = alignment[list(alignment.keys())[0]]
 413            for nt_position in range(alignment_length):
 414                if ref[nt_position:nt_position + 3] in starts:
 415                    conserved_start = True
 416                    for sequence in alignment:
 417                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts:
 418                            conserved_start = False
 419                            break
 420                    if conserved_start:
 421                        list_of_starts.append(nt_position)
 422
 423                if ref[nt_position:nt_position + 3] in stops:
 424                    conserved_stop = True
 425                    for sequence in alignment:
 426                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops:
 427                            conserved_stop = False
 428                            break
 429                    if conserved_stop:
 430                        list_of_stops.append(nt_position)
 431
 432            return list_of_starts, list_of_stops
 433
 434        def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list:
 435            """
 436            get ungapped sequences starting and stop codons and eliminate gaps
 437            :param alignment: alignment
 438            :param start_pos: start codon
 439            :param stop_pos: stop codon
 440            :return: sliced sequences
 441            """
 442            ungapped_seqs = []
 443            for seq_id in alignment:
 444                ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', ''))
 445
 446            return ungapped_seqs
 447
 448        def additional_stops(ungapped_seqs: list) -> bool:
 449            """
 450            Checks for the presence of a stop codon
 451            :param ungapped_seqs: list of ungapped sequences
 452            :return: Additional stop codons (True/False)
 453            """
 454            stops = config.STOP_CODONS[self.aln_type]
 455
 456            for sliced_seq in ungapped_seqs:
 457                for position in range(0, len(sliced_seq) - 3, 3):
 458                    if sliced_seq[position:position + 3] in stops:
 459                        return True
 460            return False
 461
 462        def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float:
 463            sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1  # identical = 0, different = -1 --> add 1
 464            return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100
 465
 466        # checks for arguments
 467        if self.aln_type == 'AA':
 468            raise TypeError('ORF search only for RNA/DNA alignments')
 469
 470        if identity_cutoff is not None:
 471            if identity_cutoff > 100 or identity_cutoff < 0:
 472                raise ValueError('conservation cutoff must be between 0 and 100')
 473
 474        if min_length <= 6 or min_length > self.length:
 475            raise ValueError(f'min_length must be between 6 and {self.length}')
 476
 477        # ini
 478        identities = self.calc_identity_alignment()
 479        alignments = [self.alignment, self.calc_reverse_complement_alignment()]
 480        aln_len = self.length
 481
 482        orf_counter = 0
 483        orf_dict = {}
 484
 485        for aln, direction in zip(alignments, ['+', '-']):
 486            # check for starts and stops in the first seq and then check if these are present in all seqs
 487            conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len)
 488            # check each frame
 489            for frame in (0, 1, 2):
 490                potential_starts = [x for x in conserved_starts if x % 3 == frame]
 491                potential_stops = [x for x in conserved_stops if x % 3 == frame]
 492                last_stop = -1
 493                for start in potential_starts:
 494                    # go to the next stop that is sufficiently far away in the alignment
 495                    next_stops = [x for x in potential_stops if x + 3 >= start + min_length]
 496                    if not next_stops:
 497                        continue
 498                    next_stop = next_stops[0]
 499                    ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop)
 500                    # re-check the lengths of all ungapped seqs
 501                    ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs]
 502                    if not all(ungapped_seq_lengths):
 503                        continue
 504                    # if no stop codon between start and stop --> write to dictionary
 505                    if not additional_stops(ungapped_sliced_seqs):
 506                        if direction == '+':
 507                            positions = [start, next_stop + 3]
 508                        else:
 509                            positions = [aln_len - next_stop - 3, aln_len - start]
 510                        if last_stop != next_stop:
 511                            last_stop = next_stop
 512                            conservation = calculate_identity(identities, positions)
 513                            if identity_cutoff is not None and conservation < identity_cutoff:
 514                                continue
 515                            orf_dict[f'ORF_{orf_counter}'] = {'location': [positions],
 516                                                              'frame': frame,
 517                                                              'strand': direction,
 518                                                              'conservation': conservation,
 519                                                              'internal': []
 520                                                              }
 521                            orf_counter += 1
 522                        else:
 523                            if orf_dict:
 524                                orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions)
 525
 526        return orf_dict
 527
 528    def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict:
 529        """
 530        First calculates all ORFs and then searches from 5'
 531        all non-overlapping orfs in the fw strand and from the
 532        3' all non-overlapping orfs in th rw strand.
 533
 534        **No overlap algorithm:**
 535            **frame 1:** -[M------*]--- ----[M--*]---------[M-----
 536
 537            **frame 2:** -------[M------*]---------[M---*]--------
 538
 539            **frame 3:** [M---*]-----[M----------*]----------[M---
 540
 541            **results:** [M---*][M------*]--[M--*]-[M---*]-[M-----
 542
 543            frame:    3      2           1      2       1
 544
 545        :return: dictionary with non-overlapping orfs
 546        """
 547        orf_dict = self.get_conserved_orfs(min_length, identity_cutoff)
 548
 549        fw_orfs, rw_orfs = [], []
 550
 551        for orf in orf_dict:
 552            if orf_dict[orf]['strand'] == '+':
 553                fw_orfs.append((orf, orf_dict[orf]['location'][0]))
 554            else:
 555                rw_orfs.append((orf, orf_dict[orf]['location'][0]))
 556
 557        fw_orfs.sort(key=lambda x: x[1][0])  # sort by start pos
 558        rw_orfs.sort(key=lambda x: x[1][1], reverse=True)  # sort by stop pos
 559        non_overlapping_orfs = []
 560        for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']):
 561            previous_stop = -1 if strand == '+' else self.length + 1
 562            for orf in orf_list:
 563                if strand == '+' and orf[1][0] > previous_stop:
 564                    non_overlapping_orfs.append(orf[0])
 565                    previous_stop = orf[1][1]
 566                elif strand == '-' and orf[1][1] < previous_stop:
 567                    non_overlapping_orfs.append(orf[0])
 568                    previous_stop = orf[1][0]
 569
 570        non_overlap_dict = {}
 571        for orf in orf_dict:
 572            if orf in non_overlapping_orfs:
 573                non_overlap_dict[orf] = orf_dict[orf]
 574
 575        return non_overlap_dict
 576
 577    def calc_length_stats(self) -> dict:
 578        """
 579        Determine the stats for the length of the ungapped seqs in the alignment.
 580        :return: dictionary with length stats
 581        """
 582
 583        seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment]
 584
 585        return {'number of seq': len(self.alignment),
 586                'mean length': float(np.mean(seq_lengths)),
 587                'std length': float(np.std(seq_lengths)),
 588                'min length': int(np.min(seq_lengths)),
 589                'max length': int(np.max(seq_lengths))
 590                }
 591
 592    def calc_entropy(self) -> list:
 593        """
 594        Calculate the normalized shannon's entropy for every position in an alignment:
 595
 596        - 1: high entropy
 597        - 0: low entropy
 598
 599        :return: Entropies at each position.
 600        """
 601
 602        # helper functions
 603        def shannons_entropy(character_list: list, states: int, aln_type: str) -> float:
 604            """
 605            Calculate the shannon's entropy of a sequence and
 606            normalized between 0 and 1.
 607            :param character_list: characters at an alignment position
 608            :param states: number of potential characters that can be present
 609            :param aln_type: type of the alignment
 610            :returns: entropy
 611            """
 612            ent, n_chars = 0, len(character_list)
 613            # only one char is in the list
 614            if n_chars <= 1:
 615                return ent
 616            # calculate the number of unique chars and their counts
 617            chars, char_counts = np.unique(character_list, return_counts=True)
 618            char_counts = char_counts.astype(float)
 619            # ignore gaps for entropy calc
 620            char_counts, chars = char_counts[chars != "-"], chars[chars != "-"]
 621            # correctly handle ambiguous chars
 622            index_to_drop = []
 623            for index, char in enumerate(chars):
 624                if char in config.AMBIG_CHARS[aln_type]:
 625                    index_to_drop.append(index)
 626                    amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True)
 627                    amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char])
 628                    # add the proportionate numbers to initial array
 629                    for amb_char, amb_count in zip(amb_chars, amb_counts):
 630                        if amb_char in chars:
 631                            char_counts[chars == amb_char] += amb_count
 632                        else:
 633                            chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count)
 634            # drop the ambiguous characters from array
 635            char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop)
 636            # calc the entropy
 637            probs = char_counts / n_chars
 638            if np.count_nonzero(probs) <= 1:
 639                return ent
 640            for prob in probs:
 641                ent -= prob * math.log(prob, states)
 642
 643            return ent
 644
 645        aln = self.alignment
 646        entropys = []
 647
 648        if self.aln_type == 'AA':
 649            states = 20
 650        else:
 651            states = 4
 652        # iterate over alignment positions and the sequences
 653        for nuc_pos in range(self.length):
 654            pos = []
 655            for record in aln:
 656                pos.append(aln[record][nuc_pos])
 657            entropys.append(shannons_entropy(pos, states, self.aln_type))
 658
 659        return entropys
 660
 661    def calc_gc(self) -> list | TypeError:
 662        """
 663        Determine the GC content for every position in an nt alignment.
 664        :return: GC content for every position.
 665        :raises: TypeError for AA alignments
 666        """
 667        if self.aln_type == 'AA':
 668            raise TypeError("GC computation is not possible for aminoacid alignment")
 669
 670        gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type]
 671
 672        for position in range(self.length):
 673            nucleotides = str()
 674            for record in aln:
 675                nucleotides = nucleotides + aln[record][position]
 676            # ini dict with chars that occur and which ones to
 677            # count in which freq
 678            to_count = {
 679                'G': 1,
 680                'C': 1,
 681            }
 682            # handle ambig. nuc
 683            for char in amb_nucs:
 684                if char in nucleotides:
 685                    to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char])
 686
 687            gc.append(
 688                sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides)
 689            )
 690
 691        return gc
 692
 693    def calc_coverage(self) -> list:
 694        """
 695        Determine the coverage of every position in an alignment.
 696        This is defined as:
 697            1 - cumulative length of '-' characters
 698
 699        :return: Coverage at each alignment position.
 700        """
 701        coverage, aln = [], self.alignment
 702
 703        for nuc_pos in range(self.length):
 704            pos = str()
 705            for record in aln.keys():
 706                pos = pos + aln[record][nuc_pos]
 707            coverage.append(1 - pos.count('-') / len(pos))
 708
 709        return coverage
 710
 711    def calc_reverse_complement_alignment(self) -> dict | TypeError:
 712        """
 713        Reverse complement the alignment.
 714        :return: Alignment (rv)
 715        """
 716        if self.aln_type == 'AA':
 717            raise TypeError('Reverse complement only for RNA or DNA.')
 718
 719        aln = self.alignment
 720        reverse_complement_dict = {}
 721
 722        for seq_id in aln:
 723            reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id]))
 724
 725        return reverse_complement_dict
 726
 727    def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False):
 728        """
 729        Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the
 730        remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard'].
 731
 732        :param encode_ambiguities: encode ambiguities as -2
 733        :param encode_mask: encode mask with as -3
 734        :returns matrix
 735        """
 736
 737        aln = self.alignment
 738        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 739        # ini matrix
 740        numerical_matrix = np.full(sequences.shape, np.nan, dtype=float)
 741        # first encode mask
 742        if encode_mask:
 743            if self.aln_type == 'AA':
 744                is_n_or_x = np.isin(sequences, ['X'])
 745            else:
 746                is_n_or_x = np.isin(sequences, ['N'])
 747            numerical_matrix[is_n_or_x] = -2
 748        # next encode ambig chars
 749        if encode_ambiguities:
 750            numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3
 751        # next convert each char into their respective values
 752        for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
 753            numerical_matrix[np.isin(sequences, [char])] = idx + 1
 754
 755        return numerical_matrix
 756
 757    def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray:
 758        """
 759        Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n
 760
 761        :param encode_mismatches: encode mismatch as -1
 762        :param encode_mask: encode mask with value=-2 --> also in the reference
 763        :param encode_gaps: encode gaps with np.nan --> also in the reference
 764        :param encode_ambiguities: encode ambiguities with value=-3
 765        :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard']
 766        :return: identity alignment
 767        """
 768
 769        aln = self.alignment
 770        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
 771
 772        # convert alignment to array
 773        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 774        reference = np.array(list(ref))
 775        # ini matrix
 776        identity_matrix = np.full(sequences.shape, 0, dtype=float)
 777
 778        is_identical = sequences == reference
 779
 780        if encode_gaps:
 781            is_gap = sequences == '-'
 782        else:
 783            is_gap = np.full(sequences.shape, False)
 784
 785        if encode_mask:
 786            if self.aln_type == 'AA':
 787                is_n_or_x = np.isin(sequences, ['X'])
 788            else:
 789                is_n_or_x = np.isin(sequences, ['N'])
 790        else:
 791            is_n_or_x = np.full(sequences.shape, False)
 792
 793        if encode_ambiguities:
 794            is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])
 795        else:
 796            is_ambig = np.full(sequences.shape, False)
 797
 798        if encode_mismatches:
 799            is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig
 800        else:
 801            is_mismatch = np.full(sequences.shape, False)
 802
 803        # encode every different character
 804        if encode_each_mismatch_char:
 805            for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
 806                new_encoding = np.isin(sequences, [char]) & is_mismatch
 807                identity_matrix[new_encoding] = idx + 1
 808        # or encode different with a single value
 809        else:
 810            identity_matrix[is_mismatch] = -1  # mismatch
 811
 812        identity_matrix[is_gap] = np.nan  # gap
 813        identity_matrix[is_n_or_x] = -2  # 'N' or 'X'
 814        identity_matrix[is_ambig] = -3  # ambiguities
 815
 816        return identity_matrix
 817
 818    def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray:
 819        """
 820        Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight
 821        differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the
 822        reference residue at each column. Gaps are encoded as np.nan.
 823
 824        The calculation follows these steps:
 825
 826        1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise,
 827           a consensus sequence is generated to serve as the reference.
 828        2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as
 829           BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type.
 830        3. **Per-Column Normalization (optional)**:
 831
 832        For each column in the alignment:
 833            - The residue in the reference sequence is treated as the baseline for that column.
 834            - The substitution scores for the reference residue are extracted from the substitution matrix.
 835            - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue.
 836            - This ensures that identical residues (or those with high similarity to the reference) have high scores,
 837            while more dissimilar residues have lower scores.
 838        4. **Output**:
 839
 840           - The normalized similarity scores are stored in a NumPy array.
 841           - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`.
 842
 843        :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN)
 844        :param: normalize: whether to normalize the similarity scores to range [0, 1]
 845        :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue
 846            and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity).
 847            Gaps and invalid residues are encoded as `np.nan`.
 848        :raise: ValueError
 849            If the specified substitution matrix is not available for the given alignment type.
 850        """
 851
 852        aln = self.alignment
 853        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
 854        if matrix_type is None:
 855            if self.aln_type == 'AA':
 856                matrix_type = 'BLOSUM65'
 857            else:
 858                matrix_type = 'TRANS'
 859        # load substitution matrix as dictionary
 860        try:
 861            subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type]
 862        except KeyError:
 863            raise ValueError(
 864                f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}'
 865            )
 866
 867        # set dtype and convert alignment to a NumPy array for vectorized processing
 868        dtype = np.dtype(float, metadata={'matrix': matrix_type})
 869        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 870        reference = np.array(list(ref))
 871        valid_chars = list(subs_matrix.keys())
 872        similarity_array = np.full(sequences.shape, np.nan, dtype=dtype)
 873
 874        for j, ref_char in enumerate(reference):
 875            if ref_char not in valid_chars + ['-']:
 876                continue
 877            # Get local min and max for the reference residue
 878            if normalize and ref_char != '-':
 879                local_scores = subs_matrix[ref_char].values()
 880                local_min, local_max = min(local_scores), max(local_scores)
 881
 882            for i, char in enumerate(sequences[:, j]):
 883                if char not in valid_chars:
 884                    continue
 885                # classify the similarity as max if the reference has a gap
 886                similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1
 887                similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score
 888
 889        return similarity_array
 890
 891    def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError:
 892        """
 893        Calculates a position matrix of the specified type for the given alignment. The function
 894        supports generating matrices of types Position Frequency Matrix (PFM), Position Probability
 895        Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates
 896        the provided matrix type and includes pseudo-count adjustments to ensure robust calculations.
 897
 898        :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM',
 899            'PWM', and 'IC'. Defaults to 'PWM'.
 900        :type matrix_type: str
 901        :raises ValueError: If the provided `matrix_type` is not one of the accepted values.
 902        :return: A numpy array representing the calculated position matrix of the specified type.
 903        :rtype: np.ndarray
 904        """
 905
 906        # ini
 907        aln = self.alignment
 908        if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']:
 909            raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.')
 910        possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys())
 911        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 912
 913        # calc position frequency matrix
 914        pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars])
 915        if matrix_type == 'PFM':
 916            return pfm
 917
 918        # calc position probability matrix (probability)
 919        pseudo_count = 0.0001  # to avoid 0 values
 920        pfm = pfm + pseudo_count
 921        ppm_non_char_excluded = pfm/np.sum(pfm, axis=0)  # use this for pwm/ic calculation
 922        ppm = pfm/len(aln.keys())  # calculate the frequency based on row number
 923        if matrix_type == 'PPM':
 924            return ppm
 925
 926        # calc position weight matrix (log-likelihood)
 927        pwm = np.log2(ppm_non_char_excluded * len(possible_chars))
 928        if matrix_type == 'PWM':
 929            return pwm
 930
 931        # calc information content per position (in bits) - can be used to scale a ppm for sequence logos
 932        ic = np.sum(ppm_non_char_excluded * pwm, axis=0)
 933        if matrix_type == 'IC':
 934            return ic
 935
 936        return None
 937
 938    def calc_percent_recovery(self) -> dict:
 939        """
 940        Recovery per sequence either compared to the majority consensus seq
 941        or the reference seq.\n
 942        Defined as:\n
 943
 944        `(1 - sum(N/X/- characters in ungapped ref regions))*100`
 945
 946        This is highly similar to how nextclade calculates recovery over reference.
 947
 948        :return: dict
 949        """
 950
 951        aln = self.alignment
 952
 953        if self.reference_id is not None:
 954            ref = aln[self.reference_id]
 955        else:
 956            ref = self.get_consensus()  # majority consensus
 957
 958        if not any(char != '-' for char in ref):
 959            raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.")
 960
 961
 962        # count 'N', 'X' and '-' chars in non-gapped regions
 963        recovery_over_ref = dict()
 964
 965        # Get positions of non-gap characters in the reference
 966        non_gap_positions = [i for i, char in enumerate(ref) if char != '-']
 967        cumulative_length = len(non_gap_positions)
 968
 969        # Calculate recovery
 970        for seq_id in aln:
 971            if seq_id == self.reference_id:
 972                continue
 973            seq = aln[seq_id]
 974            count_invalid = sum(
 975                seq[pos] == '-' or
 976                (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N')
 977                for pos in non_gap_positions
 978            )
 979            recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100
 980
 981        return recovery_over_ref
 982
 983    def calc_character_frequencies(self) -> dict:
 984        """
 985        Calculate the percentage characters in the alignment:
 986        The frequencies are counted by seq and in total. The
 987        percentage of non-gap characters in the alignment is
 988        relative to the total number of non-gap characters.
 989        The gap percentage is relative to the sequence length.
 990
 991        The output is a nested dictionary.
 992
 993        :return: Character frequencies
 994        """
 995
 996        aln, aln_length = self.alignment, self.length
 997
 998        freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}}
 999
1000        for seq_id in aln:
1001            freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0
1002            unique_chars = set(aln[seq_id])
1003            for char in unique_chars:
1004                if char == '-':
1005                    continue
1006                # add characters to dictionaries
1007                if char not in freqs[seq_id]:
1008                    freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0}
1009                if char not in freqs['total']:
1010                    freqs['total'][char] = {'counts': 0, '% of non-gapped': 0}
1011                # count non-gap chars
1012                freqs[seq_id][char]['counts'] += aln[seq_id].count(char)
1013                freqs['total'][char]['counts'] += freqs[seq_id][char]['counts']
1014                all_chars += freqs[seq_id][char]['counts']
1015            # normalize counts
1016            for char in freqs[seq_id]:
1017                if char == '-':
1018                    continue
1019                freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100
1020                freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped']
1021            # count gaps
1022            freqs[seq_id]['-']['counts'] = aln[seq_id].count('-')
1023            freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts']
1024            # normalize gap counts
1025            freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100
1026            freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment']
1027
1028        # normalize the total counts
1029        for char in freqs['total']:
1030            for value in freqs['total'][char]:
1031                if value == '% of alignment' or value == '% of non-gapped':
1032                    freqs['total'][char][value] = freqs['total'][char][value] / len(aln)
1033
1034        return freqs
1035
1036    def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray:
1037        """
1038        Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented:
1039
1040        **1) ghd (global hamming distance)**: At each alignment position, check if characters match:
1041        \ndistance = matches / alignment_length * 100
1042
1043        **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps:
1044        \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100
1045
1046        **3) ged (gap excluded distance)**: All gaps are excluded from the alignment
1047        \ndistance = matches / (matches + mismatches) * 100
1048
1049        **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch.
1050        \ndistance = matches / gap_compressed_alignment_length * 100
1051
1052        :return: array with pairwise distances.
1053        """
1054
1055        def hamming_distance(seq1: str, seq2: str) -> int:
1056            return sum(c1 == c2 for c1, c2 in zip(seq1, seq2))
1057
1058        def ghd(seq1: str, seq2: str) -> float:
1059            return hamming_distance(seq1, seq2) / self.length * 100
1060
1061        def lhd(seq1, seq2):
1062            # Trim gaps from both sides
1063            i, j = 0, self.length - 1
1064            while i < self.length and (seq1[i] == '-' or seq2[i] == '-'):
1065                i += 1
1066            while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'):
1067                j -= 1
1068            if i > j:
1069                return 0.0
1070
1071            seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1]
1072            matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_))
1073            length = j - i + 1
1074            return (matches / length) * 100 if length > 0 else 0.0
1075
1076        def ged(seq1: str, seq2: str) -> float:
1077
1078            matches, mismatches = 0, 0
1079
1080            for c1, c2 in zip(seq1, seq2):
1081                if c1 != '-' and c2 != '-':
1082                    if c1 == c2:
1083                        matches += 1
1084                    else:
1085                        mismatches += 1
1086            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1087
1088        def gcd(seq1: str, seq2: str) -> float:
1089            matches = 0
1090            mismatches = 0
1091            in_gap = False
1092
1093            for char1, char2 in zip(seq1, seq2):
1094                if char1 == '-' and char2 == '-':  # Shared gap: do nothing
1095                    continue
1096                elif char1 == '-' or char2 == '-':  # Gap in only one sequence
1097                    if not in_gap:  # Start of a new gap stretch
1098                        mismatches += 1
1099                        in_gap = True
1100                else:  # No gaps
1101                    in_gap = False
1102                    if char1 == char2:  # Matching characters
1103                        matches += 1
1104                    else:  # Mismatched characters
1105                        mismatches += 1
1106
1107            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1108
1109
1110        # Map distance type to corresponding function
1111        distance_functions: Dict[str, Callable[[str, str], float]] = {
1112            'ghd': ghd,
1113            'lhd': lhd,
1114            'ged': ged,
1115            'gcd': gcd
1116        }
1117
1118        if distance_type not in distance_functions:
1119            raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.")
1120
1121        # Compute pairwise distances
1122        aln = self.alignment
1123        distance_func = distance_functions[distance_type]
1124        distance_matrix = np.zeros((len(aln), len(aln)))
1125
1126        sequences = list(aln.values())
1127        n = len(sequences)
1128        for i in range(n):
1129            seq1 = sequences[i]
1130            for j in range(i, n):
1131                seq2 = sequences[j]
1132                dist = distance_func(seq1, seq2)
1133                distance_matrix[i, j] = dist
1134                distance_matrix[j, i] = dist
1135
1136        return distance_matrix
1137
1138    def get_snps(self, include_ambig:bool=False) -> dict:
1139        """
1140        Calculate snps similar to snp-sites (output is comparable):
1141        https://github.com/sanger-pathogens/snp-sites
1142        Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character.
1143        The SNPs are compared to a majority consensus sequence or to a reference if it has been set.
1144
1145        :param include_ambig: Include ambiguous snps (default: False)
1146        :return: dictionary containing snp positions and their variants including their frequency.
1147        """
1148        aln = self.alignment
1149        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
1150        aln = {x: aln[x] for x in aln.keys() if x != self.reference_id}
1151        seq_ids = list(aln.keys())
1152        snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}}
1153
1154        for pos in range(self.length):
1155            reference_char = ref[pos]
1156            if not include_ambig:
1157                if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-':
1158                    continue
1159            alt_chars, snps = [], []
1160            for i, seq_id in enumerate(aln.keys()):
1161                alt_chars.append(aln[seq_id][pos])
1162                if reference_char != aln[seq_id][pos]:
1163                    snps.append(i)
1164            if not snps:
1165                continue
1166            if include_ambig:
1167                if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps):
1168                    continue
1169            else:
1170                snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]]
1171                if not snps:
1172                    continue
1173            if pos not in snp_dict:
1174                snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}}
1175            for snp in snps:
1176                if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']:
1177                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = {
1178                        'AF': 1,
1179                        'SEQ_ID': [seq_ids[snp]]
1180                    }
1181                else:
1182                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1
1183                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp])
1184            # calculate AF
1185            if pos in snp_dict['POS']:
1186                for alt in snp_dict['POS'][pos]['ALT']:
1187                    snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln)
1188
1189        return snp_dict
1190
1191    def calc_transition_transversion_score(self) -> list:
1192        """
1193        Based on the snp positions, calculates a transition/transversions score.
1194        A positive score means higher ratio of transitions and negative score means
1195        a higher ratio of transversions.
1196        :return: list
1197        """
1198
1199        if self.aln_type == 'AA':
1200            raise TypeError('TS/TV scoring only for RNA/DNA alignments')
1201
1202        # ini
1203        snps = self.get_snps()
1204        score = [0]*self.length
1205
1206        for pos in snps['POS']:
1207            t_score_temp = 0
1208            for alt in snps['POS'][pos]['ALT']:
1209                # check the type of substitution
1210                if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']:
1211                    score[pos] += snps['POS'][pos]['ALT'][alt]['AF']
1212                else:
1213                    score[pos] -= snps['POS'][pos]['ALT'][alt]['AF']
1214
1215        return score
1216
1217
1218class Annotation:
1219    """
1220    An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA.
1221    """
1222
1223    def __init__(self, aln: MSA, annotation_path: str):
1224        """
1225        The annotation class. Lets you parse multiple standard formats
1226        which might be used for annotating an alignment. The main purpose
1227        is to parse the annotation file and adapt the locations of diverse
1228        features to the locations within the alignment, considering the
1229        respective alignment positions. Importantly, IDs of the alignment
1230        and the MSA have to partly match.
1231
1232        :param aln: MSA class
1233        :param annotation_path: path to annotation file (gb, bed, gff) or raw string
1234
1235        """
1236
1237        self.ann_type, self._seq_id, self.locus, self.features  = self._parse_annotation(annotation_path, aln)  # read annotation
1238        self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id)  # extract gapped sequence
1239        self._position_map = self._build_position_map()  # build a position map
1240        self._map_to_alignment()  # adapt feature locations
1241
1242    @staticmethod
1243    def _MSA_validation_and_seq_extraction(aln: MSA, seq_id: str) -> str:
1244        """
1245        extract gapped sequence from MSA that corresponds to annotation
1246        :param aln: MSA class
1247        :param seq_id: sequence id to extract
1248        :return: gapped sequence
1249        """
1250        if not isinstance(aln, MSA):
1251            raise ValueError('alignment has to be an MSA class. use explore.MSA() to read in alignment')
1252        else:
1253            return aln._alignment[seq_id]
1254
1255    @staticmethod
1256    def _parse_annotation(annotation_path: str, aln: MSA) -> tuple[str, str, str, Dict]:
1257
1258        def detect_annotation_type(file_path: str) -> str:
1259            """
1260            Detect the type of annotation file (GenBank, GFF, or BED) based
1261            on the first relevant line (excluding empty and #)
1262
1263            :param file_path: Path to the annotation file.
1264            :return: The detected file type ('gb', 'gff', or 'bed').
1265
1266            :raises ValueError: If the file type cannot be determined.
1267            """
1268
1269            with _get_line_iterator(file_path) as file:
1270                for line in file:
1271                    # skip empty lines and comments
1272                    if not line.strip() or line.startswith('#'):
1273                        continue
1274                   # genbank
1275                    if line.startswith('LOCUS'):
1276                        return 'gb'
1277                    # gff
1278                    if len(line.split('\t')) == 9:
1279                        # Check for expected values
1280                        columns = line.split('\t')
1281                        if columns[6] in ['+', '-', '.'] and re.match(r'^\d+$', columns[3]) and re.match(r'^\d+$',columns[4]):
1282                            return 'gff'
1283                    # BED files are tab-delimited with at least 3 fields: chrom, start, end
1284                    fields = line.split('\t')
1285                    if len(fields) >= 3 and re.match(r'^\d+$', fields[1]) and re.match(r'^\d+$', fields[2]):
1286                        return 'bed'
1287                    # only read in the first line
1288                    break
1289
1290            raise ValueError(
1291                "File type could not be determined. Ensure the file follows a recognized format (GenBank, GFF, or BED).")
1292
1293        def parse_gb(file_path) -> dict:
1294            """
1295            parse a genebank file to dictionary - primarily retained are the informations
1296            for qualifiers as these will be used for plotting.
1297
1298            :param file_path: path to genebank file
1299            :return: nested dictionary
1300
1301            """
1302
1303            def sanitize_gb_location(string: str) -> tuple[list, str]:
1304                """
1305                see: https://www.insdc.org/submitting-standards/feature-table/
1306                """
1307                strand = '+'
1308                locations = []
1309                # check the direction of the annotation
1310                if 'complement' in string:
1311                    strand = '-'
1312                # sanitize operators
1313                for operator in ['complement(', 'join(', 'order(']:
1314                    string = string.replace(operator, '')
1315                # sanitize possible chars for splitting start stop -
1316                # however in the future might not simply do this
1317                # as some useful information is retained
1318                for char in ['>', '<', ')']:
1319                    string = string.replace(char, '')
1320                # check if we have multiple location e.g. due to splicing
1321                if ',' in string:
1322                    raw_locations = string.split(',')
1323                else:
1324                    raw_locations = [string]
1325                # try to split start and stop
1326                for location in raw_locations:
1327                    for sep in ['..', '.', '^']:
1328                        if sep in location:
1329                            sanitized_locations = [int(x) for x in location.split(sep)]
1330                            sanitized_locations[0] = sanitized_locations[0] - 1  # enforce 0-based starts
1331                            locations.append(sanitized_locations)
1332                            break
1333
1334                return locations, strand
1335
1336
1337            records = {}
1338            with _get_line_iterator(file_path) as file:
1339                record = None
1340                in_features = False
1341                counter_dict = {}
1342                for line in file:
1343                    line = line.rstrip()
1344                    parts = line.split()
1345                    # extract the locus id
1346                    if line.startswith('LOCUS'):
1347                        if record:
1348                            records[record['locus']] = record
1349                        record = {
1350                            'locus': parts[1],
1351                            'features': {}
1352                        }
1353
1354                    elif line.startswith('FEATURES'):
1355                        in_features = True
1356
1357                    # ignore the sequence info
1358                    elif line.startswith('ORIGIN'):
1359                        in_features = False
1360
1361                    # now write useful feature information to dictionary
1362                    elif in_features:
1363                        if not line.strip():
1364                            continue
1365                        if line[5] != ' ':
1366                            location_line = True  # remember that we are in a location for multi-line locations
1367                            feature_type, qualifier = parts[0], parts[1]
1368                            if feature_type not in record['features']:
1369                                record['features'][feature_type] = {}
1370                                counter_dict[feature_type] = 0
1371                            locations, strand = sanitize_gb_location(qualifier)
1372                            record['features'][feature_type][counter_dict[feature_type]] = {
1373                                'location': locations,
1374                                'strand': strand
1375                            }
1376                            counter_dict[feature_type] += 1
1377                        else:
1378                            # edge case for multi-line locations
1379                            if location_line and not line.strip().startswith('/'):
1380                                locations, strand = sanitize_gb_location(parts[0])
1381                                for loc in locations:
1382                                    record['features'][feature_type][counter_dict[feature_type]]['location'].append(loc)
1383                            else:
1384                                location_line = False
1385                                try:
1386                                    qualifier_type, qualifier = parts[0].split('=')
1387                                except ValueError:  # we are in the coding sequence
1388                                    qualifier = qualifier + parts[0]
1389
1390                                qualifier_type, qualifier = qualifier_type.lstrip('/'), qualifier.strip('"')
1391                                last_index = counter_dict[feature_type] - 1
1392                                record['features'][feature_type][last_index][qualifier_type] = qualifier
1393
1394            records[record['locus']] = record
1395
1396            return records
1397
1398        def parse_gff(file_path) -> dict:
1399            """
1400            Parse a GFF3 (General Feature Format) file into a dictionary structure.
1401
1402            :param file_path: path to genebank file
1403            :return: nested dictionary
1404
1405            """
1406            records = {}
1407            with _get_line_iterator(file_path) as file:
1408                previous_id, previous_feature = None, None
1409                for line in file:
1410                    if line.startswith('#') or not line.strip():
1411                        continue
1412                    parts = line.strip().split('\t')
1413                    seqid, source, feature_type, start, end, score, strand, phase, attributes = parts
1414                    # ensure that region and source features are not named differently for gff and gb
1415                    if feature_type == 'region':
1416                        feature_type = 'source'
1417                    if seqid not in records:
1418                        records[seqid] = {'locus': seqid, 'features': {}}
1419                    if feature_type not in records[seqid]['features']:
1420                        records[seqid]['features'][feature_type] = {}
1421
1422                    feature_id = len(records[seqid]['features'][feature_type])
1423                    feature = {
1424                        'strand': strand,
1425                    }
1426
1427                    # Parse attributes into key-value pairs
1428                    for attr in attributes.split(';'):
1429                        if '=' in attr:
1430                            key, value = attr.split('=', 1)
1431                            feature[key.strip()] = value.strip()
1432
1433                    # check if feature are the same --> possible splicing
1434                    if previous_id is not None and previous_feature == feature:
1435                        records[seqid]['features'][feature_type][previous_id]['location'].append([int(start)-1, int(end)])
1436                    else:
1437                        records[seqid]['features'][feature_type][feature_id] = feature
1438                        records[seqid]['features'][feature_type][feature_id]['location'] = [[int(start) - 1, int(end)]]
1439                    # set new previous id and features -> new dict as 'location' is pointed in current feature and this
1440                    # is the only key different if next feature has the same entries
1441                    previous_id, previous_feature = feature_id, {key:value for key, value in feature.items() if key != 'location'}
1442
1443            return records
1444
1445        def parse_bed(file_path) -> dict:
1446            """
1447            Parse a BED file into a dictionary structure.
1448
1449            :param file_path: path to genebank file
1450            :return: nested dictionary
1451
1452            """
1453            records = {}
1454            with _get_line_iterator(file_path) as file:
1455                for line in file:
1456                    if line.startswith('#') or not line.strip():
1457                        continue
1458                    parts = line.strip().split('\t')
1459                    chrom, start, end, *optional = parts
1460
1461                    if chrom not in records:
1462                        records[chrom] = {'locus': chrom, 'features': {}}
1463                    feature_type = 'region'
1464                    if feature_type not in records[chrom]['features']:
1465                        records[chrom]['features'][feature_type] = {}
1466
1467                    feature_id = len(records[chrom]['features'][feature_type])
1468                    feature = {
1469                        'location': [[int(start), int(end)]],  # BED uses 0-based start, convert to 1-based
1470                        'strand': '+',  # assume '+' if not present
1471                    }
1472
1473                    # Handle optional columns (name, score, strand) --> ignore 7-12
1474                    if len(optional) >= 1:
1475                        feature['name'] = optional[0]
1476                    if len(optional) >= 2:
1477                        feature['score'] = optional[1]
1478                    if len(optional) >= 3:
1479                        feature['strand'] = optional[2]
1480
1481                    records[chrom]['features'][feature_type][feature_id] = feature
1482
1483            return records
1484
1485        parse_functions: Dict[str, Callable[[str], dict]] = {
1486            'gb': parse_gb,
1487            'bed': parse_bed,
1488            'gff': parse_gff,
1489        }
1490        # determine the annotation content -> should be standard formatted
1491        try:
1492            annotation_type = detect_annotation_type(annotation_path)
1493        except ValueError as err:
1494            raise err
1495
1496        # read in the annotation
1497        annotations = parse_functions[annotation_type](annotation_path)
1498
1499        # sanity check whether one of the annotation ids and alignment ids match
1500        annotation_found = False
1501        for annotation in annotations.keys():
1502            for aln_id in aln.alignment.keys():
1503                aln_id_sanitized = aln_id.split(' ')[0]
1504                # check in both directions
1505                if aln_id_sanitized in annotation:
1506                    annotation_found = True
1507                    break
1508                if annotation in aln_id_sanitized:
1509                    annotation_found = True
1510                    break
1511
1512        if not annotation_found:
1513            raise ValueError(f'the annotations of {annotation_path} do not match any ids in the MSA')
1514
1515        # return only the annotation that has been found, the respective type and the seq_id to map to
1516        return annotation_type, aln_id, annotations[annotation]['locus'], annotations[annotation]['features']
1517
1518
1519    def _build_position_map(self) -> Dict[int, int]:
1520        """
1521        build a position map from a sequence.
1522
1523        :return genomic position: gapped position
1524        """
1525
1526        position_map = {}
1527        genomic_pos = 0
1528        for aln_index, char in enumerate(self._gapped_seq):
1529            if char != '-':
1530                position_map[genomic_pos] = aln_index
1531                genomic_pos += 1
1532        # ensure the last genomic position is included
1533        position_map[genomic_pos] = len(self._gapped_seq)
1534
1535        return position_map
1536
1537
1538    def _map_to_alignment(self):
1539        """
1540        Adjust all feature locations to alignment positions
1541        """
1542
1543        def map_location(position_map: Dict[int, int], locations: list) -> list:
1544            """
1545            Map genomic locations to alignment positions using a precomputed position map.
1546
1547            :param position_map: Positions mapped from gapped to ungapped
1548            :param locations: List of genomic start and end positions.
1549            :return: List of adjusted alignment positions.
1550            """
1551
1552            aligned_locs = []
1553            for start, end in locations:
1554                try:
1555                    aligned_start = position_map[start]
1556                    aligned_end = position_map[end]
1557                    aligned_locs.append([aligned_start, aligned_end])
1558                except KeyError:
1559                    raise ValueError(f"Positions {start}-{end} lie outside of the position map.")
1560
1561            return aligned_locs
1562
1563        for feature_type, features in self.features.items():
1564            for feature_id, feature_data in features.items():
1565                original_locations = feature_data['location']
1566                aligned_locations = map_location(self._position_map, original_locations)
1567                feature_data['location'] = aligned_locations
class MSA:
  37class MSA:
  38    """
  39    An alignment class that allows computation of several stats
  40    """
  41
  42    def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None):
  43        """
  44        Initialise an Alignment object.
  45        :param alignment_string: Path to alignment file or raw alignment string
  46        :param reference_id: reference id
  47        :param zoom_range: start and stop positions to zoom into the alignment
  48        """
  49        self._alignment = self._read_alignment(alignment_string)
  50        self._reference_id = self._validate_ref(reference_id, self._alignment)
  51        self._zoom = self._validate_zoom(zoom_range, self._alignment)
  52        self._aln_type = self._determine_aln_type(self._alignment)
  53
  54    # TODO: read in different alignment types
  55    # Static methods
  56    @staticmethod
  57    def _read_alignment(file_path: str) -> dict:
  58        """
  59        Parse MSA alignment file.
  60        :param file_path: path to alignment file
  61        :return: dictionary with ids as keys and sequences as values
  62        """
  63
  64        def add_seq(aln: dict, sequence_id: str, seq_list: list):
  65            """
  66            Add a complete sequence and check for non-allowed chars
  67            :param aln: alignment dictionary to build
  68            :param sequence_id: sequence id to add
  69            :param seq_list: sequences to add
  70            :return: alignment with added sequences
  71            """
  72            final_seq = ''.join(seq_list).upper()
  73            # Check for non-allowed characters
  74            invalid_chars = set(final_seq) - set(config.POSSIBLE_CHARS)
  75            if invalid_chars:
  76                raise ValueError(
  77                    f"{sequence_id} contains invalid characters: {', '.join(invalid_chars)}. Allowed chars are: {config.POSSIBLE_CHARS}"
  78                )
  79            aln[sequence_id] = final_seq
  80
  81            return aln
  82
  83        alignment, seq_lines = {}, []
  84        seq_id = None
  85
  86        with _get_line_iterator(file_path) as file:
  87            for i, line in enumerate(file):
  88                line = line.strip()
  89                # initial check for fasta format
  90                if i == 0 and not line.startswith(">"):
  91                    raise ValueError('Alignment has to be in fasta format starting with >SeqID.')
  92                if line.startswith(">"):
  93                    if seq_id:
  94                        alignment = add_seq(alignment, seq_id, seq_lines)
  95                    # initialize a new sequence
  96                    seq_id, seq_lines = line[1:], []
  97                else:
  98                    seq_lines.append(line)
  99            # handle last sequence
 100            if seq_id:
 101                alignment = add_seq(alignment, seq_id, seq_lines)
 102        # final sanity checks
 103        if alignment:
 104            # alignment contains only one sequence:
 105            if len(alignment) < 2:
 106                raise ValueError("Alignment must contain more than one sequence.")
 107            # alignment sequences are not same length
 108            first_seq_len = len(next(iter(alignment.values())))
 109            for sequence_id, sequence in alignment.items():
 110                if len(sequence) != first_seq_len:
 111                    raise ValueError(
 112                        f"All alignment sequences must have the same length. Sequence '{sequence_id}' has length {len(sequence)}, expected {first_seq_len}."
 113                    )
 114            # all checks passed
 115            return alignment
 116        else:
 117            raise ValueError(f"Alignment file {file_path} does not contain any sequences in fasta format.")
 118
 119    @staticmethod
 120    def _validate_ref(reference: str | None, alignment: dict) -> str | None | ValueError:
 121        """
 122        Validate if the ref seq is indeed part of the alignment.
 123        :param reference: reference seq id
 124        :param alignment: alignment dict
 125        :return: validated reference
 126        """
 127        if reference in alignment.keys():
 128            return reference
 129        elif reference is None:
 130            return reference
 131        else:
 132            raise ValueError('Reference not in alignment.')
 133
 134    @staticmethod
 135    def _validate_zoom(zoom: tuple | int, original_aln: dict) -> ValueError | tuple | None:
 136        """
 137        Validates if the user defined zoom range is within the start, end of the initial
 138        alignment.\n
 139        :param zoom: zoom range or zoom start
 140        :param original_aln: non-zoomed alignment dict
 141        :return: validated zoom range
 142        """
 143        if zoom is not None:
 144            aln_length = len(original_aln[list(original_aln.keys())[0]])
 145            # check if only over value is provided -> stop is alignment length
 146            if isinstance(zoom, int):
 147                if 0 <= zoom < aln_length:
 148                    return zoom, aln_length - 1
 149                else:
 150                    raise ValueError('Zoom start must be within the alignment length range.')
 151            # check if more than 2 values are provided
 152            if len(zoom) != 2:
 153                raise ValueError('Zoom position have to be (zoom_start, zoom_end)')
 154            # validate zoom start/stop
 155            for position in zoom:
 156                if type(position) != int:
 157                    raise ValueError('Zoom positions have to be integers.')
 158                if position not in range(0, aln_length):
 159                    raise ValueError('Zoom position out of range')
 160
 161        return zoom
 162
 163    @staticmethod
 164    def _determine_aln_type(alignment) -> str:
 165        """
 166        Determine the most likely type of alignment
 167        if 70% of chars in the alignment are nucleotide
 168        chars it is most likely a nt alignment
 169        :return: type of alignment
 170        """
 171        counter = int()
 172        for record in alignment:
 173            if 'U' in alignment[record]:
 174                return 'RNA'
 175            counter += sum(map(alignment[record].count, ['A', 'C', 'G', 'T', 'N', '-']))
 176        # determine which is the most likely type
 177        if counter / len(alignment) >= 0.7 * len(alignment[list(alignment.keys())[0]]):
 178            return 'DNA'
 179        else:
 180            return 'AA'
 181
 182    # Properties with setters
 183    @property
 184    def reference_id(self):
 185        return self._reference_id
 186
 187    @reference_id.setter
 188    def reference_id(self, ref_id: str):
 189        """
 190        Set and validate the reference id.
 191        """
 192        self._reference_id = self._validate_ref(ref_id, self.alignment)
 193
 194    @property
 195    def zoom(self) -> tuple:
 196        return self._zoom
 197
 198    @zoom.setter
 199    def zoom(self, zoom_pos: tuple | int):
 200        """
 201        Validate if the user defined zoom range.
 202        """
 203        self._zoom = self._validate_zoom(zoom_pos, self._alignment)
 204
 205    # Property without setters
 206    @property
 207    def aln_type(self) -> str:
 208        """
 209        define the aln type:
 210        RNA, DNA or AA
 211        """
 212        return self._aln_type
 213
 214    # On the fly properties without setters
 215    @property
 216    def length(self) -> int:
 217        return len(next(iter(self.alignment.values())))
 218
 219    @property
 220    def alignment(self) -> dict:
 221        """
 222        (zoomed) version of the alignment.
 223        """
 224        if self.zoom is not None:
 225            zoomed_aln = dict()
 226            for seq in self._alignment:
 227                zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]]
 228            return zoomed_aln
 229        else:
 230            return self._alignment
 231
 232    # functions for different alignment stats
 233    def get_reference_coords(self) -> tuple[int, int]:
 234        """
 235        Determine the start and end coordinates of the reference sequence
 236        defined as the first/last nucleotide in the reference sequence
 237        (excluding N and gaps).
 238
 239        :return: start, end
 240        """
 241        start, end = 0, self.length
 242
 243        if self.reference_id is None:
 244            return start, end
 245        else:
 246            # 5' --> 3'
 247            for start in range(self.length):
 248                if self.alignment[self.reference_id][start] not in ['-', 'N']:
 249                    break
 250            # 3' --> 5'
 251            for end in range(self.length - 1, 0, -1):
 252                if self.alignment[self.reference_id][end] not in ['-', 'N']:
 253                    break
 254
 255            return start, end
 256
 257    def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str:
 258        """
 259        Creates a non-gapped consensus sequence.
 260
 261        :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes
 262            the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments)
 263            or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold.
 264        :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position
 265            has a frequency above the defined threshold.
 266        :return: consensus sequence
 267        """
 268
 269        # helper functions
 270        def determine_counts(alignment_dict: dict, position: int) -> dict:
 271            """
 272            count the number of each char at
 273            an idx of the alignment. return sorted dic.
 274            handles ambiguous nucleotides in sequences.
 275            also handles gaps.
 276            """
 277            nucleotide_list = []
 278
 279            # get all nucleotides
 280            for sequence in alignment_dict.items():
 281                nucleotide_list.append(sequence[1][position])
 282            # count occurences of nucleotides
 283            counter = dict(collections.Counter(nucleotide_list))
 284            # get permutations of an ambiguous nucleotide
 285            to_delete = []
 286            temp_dict = {}
 287            for nucleotide in counter:
 288                if nucleotide in config.AMBIG_CHARS[self.aln_type]:
 289                    to_delete.append(nucleotide)
 290                    permutations = config.AMBIG_CHARS[self.aln_type][nucleotide]
 291                    adjusted_freq = 1 / len(permutations)
 292                    for permutation in permutations:
 293                        if permutation in temp_dict:
 294                            temp_dict[permutation] += adjusted_freq
 295                        else:
 296                            temp_dict[permutation] = adjusted_freq
 297
 298            # drop ambiguous entries and add adjusted freqs to
 299            if to_delete:
 300                for i in to_delete:
 301                    counter.pop(i)
 302                for nucleotide in temp_dict:
 303                    if nucleotide in counter:
 304                        counter[nucleotide] += temp_dict[nucleotide]
 305                    else:
 306                        counter[nucleotide] = temp_dict[nucleotide]
 307
 308            return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True))
 309
 310        def get_consensus_char(counts: dict, cutoff: float) -> list:
 311            """
 312            get a list of nucleotides for the consensus seq
 313            """
 314            n = 0
 315
 316            consensus_chars = []
 317            for char in counts:
 318                n += counts[char]
 319                consensus_chars.append(char)
 320                if n >= cutoff:
 321                    break
 322
 323            return consensus_chars
 324
 325        def get_ambiguous_char(nucleotides: list) -> str:
 326            """
 327            get ambiguous char from a list of nucleotides
 328            """
 329            for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items():
 330                if set(permutations) == set(nucleotides):
 331                    break
 332
 333            return ambiguous
 334
 335        # check if params have been set correctly
 336        if threshold is not None:
 337            if threshold < 0 or threshold > 1:
 338                raise ValueError('Threshold must be between 0 and 1.')
 339        if self.aln_type == 'AA' and use_ambig_nt:
 340            raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.')
 341        if threshold is None and use_ambig_nt:
 342            raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.')
 343
 344        alignment = self.alignment
 345        consensus = str()
 346
 347        if threshold is not None:
 348            consensus_cutoff = len(alignment) * threshold
 349        else:
 350            consensus_cutoff = 0
 351
 352        # built consensus sequences
 353        for idx in range(self.length):
 354            char_counts = determine_counts(alignment, idx)
 355            consensus_chars = get_consensus_char(
 356                char_counts,
 357                consensus_cutoff
 358            )
 359            if threshold != 0:
 360                if len(consensus_chars) > 1:
 361                    if use_ambig_nt:
 362                        char = get_ambiguous_char(consensus_chars)
 363                    else:
 364                        if self.aln_type == 'AA':
 365                            char = 'X'
 366                        else:
 367                            char = 'N'
 368                    consensus = consensus + char
 369                else:
 370                    consensus = consensus + consensus_chars[0]
 371            else:
 372                consensus = consensus + consensus_chars[0]
 373
 374        return consensus
 375
 376    def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict:
 377        """
 378        **conserved ORF definition:**
 379            - conserved starts and stops
 380            - start, stop must be on the same frame
 381            - stop - start must be at least min_length
 382            - all ungapped seqs[start:stop] must have at least min_length
 383            - no ungapped seq can have a Stop in between Start Stop
 384
 385        Conservation is measured by number of positions with identical characters divided by
 386        orf slice of the alignment.
 387
 388        **Algorithm overview:**
 389            - check for conserved start and stop codons
 390            - iterate over all three frames
 391            - check each start and next sufficiently far away stop codon
 392            - check if all ungapped seqs between start and stop codon are >= min_length
 393            - check if no ungapped seq in the alignment has a stop codon
 394            - write to dictionary
 395            - classify as internal if the stop codon has already been written with a prior start
 396            - repeat for reverse complement
 397
 398        :return: ORF positions and internal ORF positions
 399        """
 400
 401        # helper functions
 402        def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple:
 403            """
 404            Determine all start and stop codons within an alignment.
 405            :param alignment: alignment
 406            :param alignment_length: length of alignment
 407            :return: start and stop codon positions
 408            """
 409            starts = config.START_CODONS[self.aln_type]
 410            stops = config.STOP_CODONS[self.aln_type]
 411
 412            list_of_starts, list_of_stops = [], []
 413            ref = alignment[list(alignment.keys())[0]]
 414            for nt_position in range(alignment_length):
 415                if ref[nt_position:nt_position + 3] in starts:
 416                    conserved_start = True
 417                    for sequence in alignment:
 418                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts:
 419                            conserved_start = False
 420                            break
 421                    if conserved_start:
 422                        list_of_starts.append(nt_position)
 423
 424                if ref[nt_position:nt_position + 3] in stops:
 425                    conserved_stop = True
 426                    for sequence in alignment:
 427                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops:
 428                            conserved_stop = False
 429                            break
 430                    if conserved_stop:
 431                        list_of_stops.append(nt_position)
 432
 433            return list_of_starts, list_of_stops
 434
 435        def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list:
 436            """
 437            get ungapped sequences starting and stop codons and eliminate gaps
 438            :param alignment: alignment
 439            :param start_pos: start codon
 440            :param stop_pos: stop codon
 441            :return: sliced sequences
 442            """
 443            ungapped_seqs = []
 444            for seq_id in alignment:
 445                ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', ''))
 446
 447            return ungapped_seqs
 448
 449        def additional_stops(ungapped_seqs: list) -> bool:
 450            """
 451            Checks for the presence of a stop codon
 452            :param ungapped_seqs: list of ungapped sequences
 453            :return: Additional stop codons (True/False)
 454            """
 455            stops = config.STOP_CODONS[self.aln_type]
 456
 457            for sliced_seq in ungapped_seqs:
 458                for position in range(0, len(sliced_seq) - 3, 3):
 459                    if sliced_seq[position:position + 3] in stops:
 460                        return True
 461            return False
 462
 463        def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float:
 464            sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1  # identical = 0, different = -1 --> add 1
 465            return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100
 466
 467        # checks for arguments
 468        if self.aln_type == 'AA':
 469            raise TypeError('ORF search only for RNA/DNA alignments')
 470
 471        if identity_cutoff is not None:
 472            if identity_cutoff > 100 or identity_cutoff < 0:
 473                raise ValueError('conservation cutoff must be between 0 and 100')
 474
 475        if min_length <= 6 or min_length > self.length:
 476            raise ValueError(f'min_length must be between 6 and {self.length}')
 477
 478        # ini
 479        identities = self.calc_identity_alignment()
 480        alignments = [self.alignment, self.calc_reverse_complement_alignment()]
 481        aln_len = self.length
 482
 483        orf_counter = 0
 484        orf_dict = {}
 485
 486        for aln, direction in zip(alignments, ['+', '-']):
 487            # check for starts and stops in the first seq and then check if these are present in all seqs
 488            conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len)
 489            # check each frame
 490            for frame in (0, 1, 2):
 491                potential_starts = [x for x in conserved_starts if x % 3 == frame]
 492                potential_stops = [x for x in conserved_stops if x % 3 == frame]
 493                last_stop = -1
 494                for start in potential_starts:
 495                    # go to the next stop that is sufficiently far away in the alignment
 496                    next_stops = [x for x in potential_stops if x + 3 >= start + min_length]
 497                    if not next_stops:
 498                        continue
 499                    next_stop = next_stops[0]
 500                    ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop)
 501                    # re-check the lengths of all ungapped seqs
 502                    ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs]
 503                    if not all(ungapped_seq_lengths):
 504                        continue
 505                    # if no stop codon between start and stop --> write to dictionary
 506                    if not additional_stops(ungapped_sliced_seqs):
 507                        if direction == '+':
 508                            positions = [start, next_stop + 3]
 509                        else:
 510                            positions = [aln_len - next_stop - 3, aln_len - start]
 511                        if last_stop != next_stop:
 512                            last_stop = next_stop
 513                            conservation = calculate_identity(identities, positions)
 514                            if identity_cutoff is not None and conservation < identity_cutoff:
 515                                continue
 516                            orf_dict[f'ORF_{orf_counter}'] = {'location': [positions],
 517                                                              'frame': frame,
 518                                                              'strand': direction,
 519                                                              'conservation': conservation,
 520                                                              'internal': []
 521                                                              }
 522                            orf_counter += 1
 523                        else:
 524                            if orf_dict:
 525                                orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions)
 526
 527        return orf_dict
 528
 529    def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict:
 530        """
 531        First calculates all ORFs and then searches from 5'
 532        all non-overlapping orfs in the fw strand and from the
 533        3' all non-overlapping orfs in th rw strand.
 534
 535        **No overlap algorithm:**
 536            **frame 1:** -[M------*]--- ----[M--*]---------[M-----
 537
 538            **frame 2:** -------[M------*]---------[M---*]--------
 539
 540            **frame 3:** [M---*]-----[M----------*]----------[M---
 541
 542            **results:** [M---*][M------*]--[M--*]-[M---*]-[M-----
 543
 544            frame:    3      2           1      2       1
 545
 546        :return: dictionary with non-overlapping orfs
 547        """
 548        orf_dict = self.get_conserved_orfs(min_length, identity_cutoff)
 549
 550        fw_orfs, rw_orfs = [], []
 551
 552        for orf in orf_dict:
 553            if orf_dict[orf]['strand'] == '+':
 554                fw_orfs.append((orf, orf_dict[orf]['location'][0]))
 555            else:
 556                rw_orfs.append((orf, orf_dict[orf]['location'][0]))
 557
 558        fw_orfs.sort(key=lambda x: x[1][0])  # sort by start pos
 559        rw_orfs.sort(key=lambda x: x[1][1], reverse=True)  # sort by stop pos
 560        non_overlapping_orfs = []
 561        for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']):
 562            previous_stop = -1 if strand == '+' else self.length + 1
 563            for orf in orf_list:
 564                if strand == '+' and orf[1][0] > previous_stop:
 565                    non_overlapping_orfs.append(orf[0])
 566                    previous_stop = orf[1][1]
 567                elif strand == '-' and orf[1][1] < previous_stop:
 568                    non_overlapping_orfs.append(orf[0])
 569                    previous_stop = orf[1][0]
 570
 571        non_overlap_dict = {}
 572        for orf in orf_dict:
 573            if orf in non_overlapping_orfs:
 574                non_overlap_dict[orf] = orf_dict[orf]
 575
 576        return non_overlap_dict
 577
 578    def calc_length_stats(self) -> dict:
 579        """
 580        Determine the stats for the length of the ungapped seqs in the alignment.
 581        :return: dictionary with length stats
 582        """
 583
 584        seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment]
 585
 586        return {'number of seq': len(self.alignment),
 587                'mean length': float(np.mean(seq_lengths)),
 588                'std length': float(np.std(seq_lengths)),
 589                'min length': int(np.min(seq_lengths)),
 590                'max length': int(np.max(seq_lengths))
 591                }
 592
 593    def calc_entropy(self) -> list:
 594        """
 595        Calculate the normalized shannon's entropy for every position in an alignment:
 596
 597        - 1: high entropy
 598        - 0: low entropy
 599
 600        :return: Entropies at each position.
 601        """
 602
 603        # helper functions
 604        def shannons_entropy(character_list: list, states: int, aln_type: str) -> float:
 605            """
 606            Calculate the shannon's entropy of a sequence and
 607            normalized between 0 and 1.
 608            :param character_list: characters at an alignment position
 609            :param states: number of potential characters that can be present
 610            :param aln_type: type of the alignment
 611            :returns: entropy
 612            """
 613            ent, n_chars = 0, len(character_list)
 614            # only one char is in the list
 615            if n_chars <= 1:
 616                return ent
 617            # calculate the number of unique chars and their counts
 618            chars, char_counts = np.unique(character_list, return_counts=True)
 619            char_counts = char_counts.astype(float)
 620            # ignore gaps for entropy calc
 621            char_counts, chars = char_counts[chars != "-"], chars[chars != "-"]
 622            # correctly handle ambiguous chars
 623            index_to_drop = []
 624            for index, char in enumerate(chars):
 625                if char in config.AMBIG_CHARS[aln_type]:
 626                    index_to_drop.append(index)
 627                    amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True)
 628                    amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char])
 629                    # add the proportionate numbers to initial array
 630                    for amb_char, amb_count in zip(amb_chars, amb_counts):
 631                        if amb_char in chars:
 632                            char_counts[chars == amb_char] += amb_count
 633                        else:
 634                            chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count)
 635            # drop the ambiguous characters from array
 636            char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop)
 637            # calc the entropy
 638            probs = char_counts / n_chars
 639            if np.count_nonzero(probs) <= 1:
 640                return ent
 641            for prob in probs:
 642                ent -= prob * math.log(prob, states)
 643
 644            return ent
 645
 646        aln = self.alignment
 647        entropys = []
 648
 649        if self.aln_type == 'AA':
 650            states = 20
 651        else:
 652            states = 4
 653        # iterate over alignment positions and the sequences
 654        for nuc_pos in range(self.length):
 655            pos = []
 656            for record in aln:
 657                pos.append(aln[record][nuc_pos])
 658            entropys.append(shannons_entropy(pos, states, self.aln_type))
 659
 660        return entropys
 661
 662    def calc_gc(self) -> list | TypeError:
 663        """
 664        Determine the GC content for every position in an nt alignment.
 665        :return: GC content for every position.
 666        :raises: TypeError for AA alignments
 667        """
 668        if self.aln_type == 'AA':
 669            raise TypeError("GC computation is not possible for aminoacid alignment")
 670
 671        gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type]
 672
 673        for position in range(self.length):
 674            nucleotides = str()
 675            for record in aln:
 676                nucleotides = nucleotides + aln[record][position]
 677            # ini dict with chars that occur and which ones to
 678            # count in which freq
 679            to_count = {
 680                'G': 1,
 681                'C': 1,
 682            }
 683            # handle ambig. nuc
 684            for char in amb_nucs:
 685                if char in nucleotides:
 686                    to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char])
 687
 688            gc.append(
 689                sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides)
 690            )
 691
 692        return gc
 693
 694    def calc_coverage(self) -> list:
 695        """
 696        Determine the coverage of every position in an alignment.
 697        This is defined as:
 698            1 - cumulative length of '-' characters
 699
 700        :return: Coverage at each alignment position.
 701        """
 702        coverage, aln = [], self.alignment
 703
 704        for nuc_pos in range(self.length):
 705            pos = str()
 706            for record in aln.keys():
 707                pos = pos + aln[record][nuc_pos]
 708            coverage.append(1 - pos.count('-') / len(pos))
 709
 710        return coverage
 711
 712    def calc_reverse_complement_alignment(self) -> dict | TypeError:
 713        """
 714        Reverse complement the alignment.
 715        :return: Alignment (rv)
 716        """
 717        if self.aln_type == 'AA':
 718            raise TypeError('Reverse complement only for RNA or DNA.')
 719
 720        aln = self.alignment
 721        reverse_complement_dict = {}
 722
 723        for seq_id in aln:
 724            reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id]))
 725
 726        return reverse_complement_dict
 727
 728    def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False):
 729        """
 730        Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the
 731        remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard'].
 732
 733        :param encode_ambiguities: encode ambiguities as -2
 734        :param encode_mask: encode mask with as -3
 735        :returns matrix
 736        """
 737
 738        aln = self.alignment
 739        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 740        # ini matrix
 741        numerical_matrix = np.full(sequences.shape, np.nan, dtype=float)
 742        # first encode mask
 743        if encode_mask:
 744            if self.aln_type == 'AA':
 745                is_n_or_x = np.isin(sequences, ['X'])
 746            else:
 747                is_n_or_x = np.isin(sequences, ['N'])
 748            numerical_matrix[is_n_or_x] = -2
 749        # next encode ambig chars
 750        if encode_ambiguities:
 751            numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3
 752        # next convert each char into their respective values
 753        for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
 754            numerical_matrix[np.isin(sequences, [char])] = idx + 1
 755
 756        return numerical_matrix
 757
 758    def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray:
 759        """
 760        Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n
 761
 762        :param encode_mismatches: encode mismatch as -1
 763        :param encode_mask: encode mask with value=-2 --> also in the reference
 764        :param encode_gaps: encode gaps with np.nan --> also in the reference
 765        :param encode_ambiguities: encode ambiguities with value=-3
 766        :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard']
 767        :return: identity alignment
 768        """
 769
 770        aln = self.alignment
 771        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
 772
 773        # convert alignment to array
 774        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 775        reference = np.array(list(ref))
 776        # ini matrix
 777        identity_matrix = np.full(sequences.shape, 0, dtype=float)
 778
 779        is_identical = sequences == reference
 780
 781        if encode_gaps:
 782            is_gap = sequences == '-'
 783        else:
 784            is_gap = np.full(sequences.shape, False)
 785
 786        if encode_mask:
 787            if self.aln_type == 'AA':
 788                is_n_or_x = np.isin(sequences, ['X'])
 789            else:
 790                is_n_or_x = np.isin(sequences, ['N'])
 791        else:
 792            is_n_or_x = np.full(sequences.shape, False)
 793
 794        if encode_ambiguities:
 795            is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])
 796        else:
 797            is_ambig = np.full(sequences.shape, False)
 798
 799        if encode_mismatches:
 800            is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig
 801        else:
 802            is_mismatch = np.full(sequences.shape, False)
 803
 804        # encode every different character
 805        if encode_each_mismatch_char:
 806            for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
 807                new_encoding = np.isin(sequences, [char]) & is_mismatch
 808                identity_matrix[new_encoding] = idx + 1
 809        # or encode different with a single value
 810        else:
 811            identity_matrix[is_mismatch] = -1  # mismatch
 812
 813        identity_matrix[is_gap] = np.nan  # gap
 814        identity_matrix[is_n_or_x] = -2  # 'N' or 'X'
 815        identity_matrix[is_ambig] = -3  # ambiguities
 816
 817        return identity_matrix
 818
 819    def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray:
 820        """
 821        Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight
 822        differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the
 823        reference residue at each column. Gaps are encoded as np.nan.
 824
 825        The calculation follows these steps:
 826
 827        1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise,
 828           a consensus sequence is generated to serve as the reference.
 829        2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as
 830           BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type.
 831        3. **Per-Column Normalization (optional)**:
 832
 833        For each column in the alignment:
 834            - The residue in the reference sequence is treated as the baseline for that column.
 835            - The substitution scores for the reference residue are extracted from the substitution matrix.
 836            - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue.
 837            - This ensures that identical residues (or those with high similarity to the reference) have high scores,
 838            while more dissimilar residues have lower scores.
 839        4. **Output**:
 840
 841           - The normalized similarity scores are stored in a NumPy array.
 842           - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`.
 843
 844        :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN)
 845        :param: normalize: whether to normalize the similarity scores to range [0, 1]
 846        :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue
 847            and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity).
 848            Gaps and invalid residues are encoded as `np.nan`.
 849        :raise: ValueError
 850            If the specified substitution matrix is not available for the given alignment type.
 851        """
 852
 853        aln = self.alignment
 854        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
 855        if matrix_type is None:
 856            if self.aln_type == 'AA':
 857                matrix_type = 'BLOSUM65'
 858            else:
 859                matrix_type = 'TRANS'
 860        # load substitution matrix as dictionary
 861        try:
 862            subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type]
 863        except KeyError:
 864            raise ValueError(
 865                f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}'
 866            )
 867
 868        # set dtype and convert alignment to a NumPy array for vectorized processing
 869        dtype = np.dtype(float, metadata={'matrix': matrix_type})
 870        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 871        reference = np.array(list(ref))
 872        valid_chars = list(subs_matrix.keys())
 873        similarity_array = np.full(sequences.shape, np.nan, dtype=dtype)
 874
 875        for j, ref_char in enumerate(reference):
 876            if ref_char not in valid_chars + ['-']:
 877                continue
 878            # Get local min and max for the reference residue
 879            if normalize and ref_char != '-':
 880                local_scores = subs_matrix[ref_char].values()
 881                local_min, local_max = min(local_scores), max(local_scores)
 882
 883            for i, char in enumerate(sequences[:, j]):
 884                if char not in valid_chars:
 885                    continue
 886                # classify the similarity as max if the reference has a gap
 887                similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1
 888                similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score
 889
 890        return similarity_array
 891
 892    def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError:
 893        """
 894        Calculates a position matrix of the specified type for the given alignment. The function
 895        supports generating matrices of types Position Frequency Matrix (PFM), Position Probability
 896        Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates
 897        the provided matrix type and includes pseudo-count adjustments to ensure robust calculations.
 898
 899        :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM',
 900            'PWM', and 'IC'. Defaults to 'PWM'.
 901        :type matrix_type: str
 902        :raises ValueError: If the provided `matrix_type` is not one of the accepted values.
 903        :return: A numpy array representing the calculated position matrix of the specified type.
 904        :rtype: np.ndarray
 905        """
 906
 907        # ini
 908        aln = self.alignment
 909        if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']:
 910            raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.')
 911        possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys())
 912        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
 913
 914        # calc position frequency matrix
 915        pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars])
 916        if matrix_type == 'PFM':
 917            return pfm
 918
 919        # calc position probability matrix (probability)
 920        pseudo_count = 0.0001  # to avoid 0 values
 921        pfm = pfm + pseudo_count
 922        ppm_non_char_excluded = pfm/np.sum(pfm, axis=0)  # use this for pwm/ic calculation
 923        ppm = pfm/len(aln.keys())  # calculate the frequency based on row number
 924        if matrix_type == 'PPM':
 925            return ppm
 926
 927        # calc position weight matrix (log-likelihood)
 928        pwm = np.log2(ppm_non_char_excluded * len(possible_chars))
 929        if matrix_type == 'PWM':
 930            return pwm
 931
 932        # calc information content per position (in bits) - can be used to scale a ppm for sequence logos
 933        ic = np.sum(ppm_non_char_excluded * pwm, axis=0)
 934        if matrix_type == 'IC':
 935            return ic
 936
 937        return None
 938
 939    def calc_percent_recovery(self) -> dict:
 940        """
 941        Recovery per sequence either compared to the majority consensus seq
 942        or the reference seq.\n
 943        Defined as:\n
 944
 945        `(1 - sum(N/X/- characters in ungapped ref regions))*100`
 946
 947        This is highly similar to how nextclade calculates recovery over reference.
 948
 949        :return: dict
 950        """
 951
 952        aln = self.alignment
 953
 954        if self.reference_id is not None:
 955            ref = aln[self.reference_id]
 956        else:
 957            ref = self.get_consensus()  # majority consensus
 958
 959        if not any(char != '-' for char in ref):
 960            raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.")
 961
 962
 963        # count 'N', 'X' and '-' chars in non-gapped regions
 964        recovery_over_ref = dict()
 965
 966        # Get positions of non-gap characters in the reference
 967        non_gap_positions = [i for i, char in enumerate(ref) if char != '-']
 968        cumulative_length = len(non_gap_positions)
 969
 970        # Calculate recovery
 971        for seq_id in aln:
 972            if seq_id == self.reference_id:
 973                continue
 974            seq = aln[seq_id]
 975            count_invalid = sum(
 976                seq[pos] == '-' or
 977                (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N')
 978                for pos in non_gap_positions
 979            )
 980            recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100
 981
 982        return recovery_over_ref
 983
 984    def calc_character_frequencies(self) -> dict:
 985        """
 986        Calculate the percentage characters in the alignment:
 987        The frequencies are counted by seq and in total. The
 988        percentage of non-gap characters in the alignment is
 989        relative to the total number of non-gap characters.
 990        The gap percentage is relative to the sequence length.
 991
 992        The output is a nested dictionary.
 993
 994        :return: Character frequencies
 995        """
 996
 997        aln, aln_length = self.alignment, self.length
 998
 999        freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}}
1000
1001        for seq_id in aln:
1002            freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0
1003            unique_chars = set(aln[seq_id])
1004            for char in unique_chars:
1005                if char == '-':
1006                    continue
1007                # add characters to dictionaries
1008                if char not in freqs[seq_id]:
1009                    freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0}
1010                if char not in freqs['total']:
1011                    freqs['total'][char] = {'counts': 0, '% of non-gapped': 0}
1012                # count non-gap chars
1013                freqs[seq_id][char]['counts'] += aln[seq_id].count(char)
1014                freqs['total'][char]['counts'] += freqs[seq_id][char]['counts']
1015                all_chars += freqs[seq_id][char]['counts']
1016            # normalize counts
1017            for char in freqs[seq_id]:
1018                if char == '-':
1019                    continue
1020                freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100
1021                freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped']
1022            # count gaps
1023            freqs[seq_id]['-']['counts'] = aln[seq_id].count('-')
1024            freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts']
1025            # normalize gap counts
1026            freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100
1027            freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment']
1028
1029        # normalize the total counts
1030        for char in freqs['total']:
1031            for value in freqs['total'][char]:
1032                if value == '% of alignment' or value == '% of non-gapped':
1033                    freqs['total'][char][value] = freqs['total'][char][value] / len(aln)
1034
1035        return freqs
1036
1037    def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray:
1038        """
1039        Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented:
1040
1041        **1) ghd (global hamming distance)**: At each alignment position, check if characters match:
1042        \ndistance = matches / alignment_length * 100
1043
1044        **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps:
1045        \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100
1046
1047        **3) ged (gap excluded distance)**: All gaps are excluded from the alignment
1048        \ndistance = matches / (matches + mismatches) * 100
1049
1050        **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch.
1051        \ndistance = matches / gap_compressed_alignment_length * 100
1052
1053        :return: array with pairwise distances.
1054        """
1055
1056        def hamming_distance(seq1: str, seq2: str) -> int:
1057            return sum(c1 == c2 for c1, c2 in zip(seq1, seq2))
1058
1059        def ghd(seq1: str, seq2: str) -> float:
1060            return hamming_distance(seq1, seq2) / self.length * 100
1061
1062        def lhd(seq1, seq2):
1063            # Trim gaps from both sides
1064            i, j = 0, self.length - 1
1065            while i < self.length and (seq1[i] == '-' or seq2[i] == '-'):
1066                i += 1
1067            while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'):
1068                j -= 1
1069            if i > j:
1070                return 0.0
1071
1072            seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1]
1073            matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_))
1074            length = j - i + 1
1075            return (matches / length) * 100 if length > 0 else 0.0
1076
1077        def ged(seq1: str, seq2: str) -> float:
1078
1079            matches, mismatches = 0, 0
1080
1081            for c1, c2 in zip(seq1, seq2):
1082                if c1 != '-' and c2 != '-':
1083                    if c1 == c2:
1084                        matches += 1
1085                    else:
1086                        mismatches += 1
1087            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1088
1089        def gcd(seq1: str, seq2: str) -> float:
1090            matches = 0
1091            mismatches = 0
1092            in_gap = False
1093
1094            for char1, char2 in zip(seq1, seq2):
1095                if char1 == '-' and char2 == '-':  # Shared gap: do nothing
1096                    continue
1097                elif char1 == '-' or char2 == '-':  # Gap in only one sequence
1098                    if not in_gap:  # Start of a new gap stretch
1099                        mismatches += 1
1100                        in_gap = True
1101                else:  # No gaps
1102                    in_gap = False
1103                    if char1 == char2:  # Matching characters
1104                        matches += 1
1105                    else:  # Mismatched characters
1106                        mismatches += 1
1107
1108            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1109
1110
1111        # Map distance type to corresponding function
1112        distance_functions: Dict[str, Callable[[str, str], float]] = {
1113            'ghd': ghd,
1114            'lhd': lhd,
1115            'ged': ged,
1116            'gcd': gcd
1117        }
1118
1119        if distance_type not in distance_functions:
1120            raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.")
1121
1122        # Compute pairwise distances
1123        aln = self.alignment
1124        distance_func = distance_functions[distance_type]
1125        distance_matrix = np.zeros((len(aln), len(aln)))
1126
1127        sequences = list(aln.values())
1128        n = len(sequences)
1129        for i in range(n):
1130            seq1 = sequences[i]
1131            for j in range(i, n):
1132                seq2 = sequences[j]
1133                dist = distance_func(seq1, seq2)
1134                distance_matrix[i, j] = dist
1135                distance_matrix[j, i] = dist
1136
1137        return distance_matrix
1138
1139    def get_snps(self, include_ambig:bool=False) -> dict:
1140        """
1141        Calculate snps similar to snp-sites (output is comparable):
1142        https://github.com/sanger-pathogens/snp-sites
1143        Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character.
1144        The SNPs are compared to a majority consensus sequence or to a reference if it has been set.
1145
1146        :param include_ambig: Include ambiguous snps (default: False)
1147        :return: dictionary containing snp positions and their variants including their frequency.
1148        """
1149        aln = self.alignment
1150        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
1151        aln = {x: aln[x] for x in aln.keys() if x != self.reference_id}
1152        seq_ids = list(aln.keys())
1153        snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}}
1154
1155        for pos in range(self.length):
1156            reference_char = ref[pos]
1157            if not include_ambig:
1158                if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-':
1159                    continue
1160            alt_chars, snps = [], []
1161            for i, seq_id in enumerate(aln.keys()):
1162                alt_chars.append(aln[seq_id][pos])
1163                if reference_char != aln[seq_id][pos]:
1164                    snps.append(i)
1165            if not snps:
1166                continue
1167            if include_ambig:
1168                if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps):
1169                    continue
1170            else:
1171                snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]]
1172                if not snps:
1173                    continue
1174            if pos not in snp_dict:
1175                snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}}
1176            for snp in snps:
1177                if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']:
1178                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = {
1179                        'AF': 1,
1180                        'SEQ_ID': [seq_ids[snp]]
1181                    }
1182                else:
1183                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1
1184                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp])
1185            # calculate AF
1186            if pos in snp_dict['POS']:
1187                for alt in snp_dict['POS'][pos]['ALT']:
1188                    snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln)
1189
1190        return snp_dict
1191
1192    def calc_transition_transversion_score(self) -> list:
1193        """
1194        Based on the snp positions, calculates a transition/transversions score.
1195        A positive score means higher ratio of transitions and negative score means
1196        a higher ratio of transversions.
1197        :return: list
1198        """
1199
1200        if self.aln_type == 'AA':
1201            raise TypeError('TS/TV scoring only for RNA/DNA alignments')
1202
1203        # ini
1204        snps = self.get_snps()
1205        score = [0]*self.length
1206
1207        for pos in snps['POS']:
1208            t_score_temp = 0
1209            for alt in snps['POS'][pos]['ALT']:
1210                # check the type of substitution
1211                if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']:
1212                    score[pos] += snps['POS'][pos]['ALT'][alt]['AF']
1213                else:
1214                    score[pos] -= snps['POS'][pos]['ALT'][alt]['AF']
1215
1216        return score

An alignment class that allows computation of several stats

MSA( alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None)
42    def __init__(self, alignment_string: str, reference_id: str = None, zoom_range: tuple | int = None):
43        """
44        Initialise an Alignment object.
45        :param alignment_string: Path to alignment file or raw alignment string
46        :param reference_id: reference id
47        :param zoom_range: start and stop positions to zoom into the alignment
48        """
49        self._alignment = self._read_alignment(alignment_string)
50        self._reference_id = self._validate_ref(reference_id, self._alignment)
51        self._zoom = self._validate_zoom(zoom_range, self._alignment)
52        self._aln_type = self._determine_aln_type(self._alignment)

Initialise an Alignment object.

Parameters
  • alignment_string: Path to alignment file or raw alignment string
  • reference_id: reference id
  • zoom_range: start and stop positions to zoom into the alignment
reference_id
183    @property
184    def reference_id(self):
185        return self._reference_id

Set and validate the reference id.

zoom: tuple
194    @property
195    def zoom(self) -> tuple:
196        return self._zoom

Validate if the user defined zoom range.

aln_type: str
206    @property
207    def aln_type(self) -> str:
208        """
209        define the aln type:
210        RNA, DNA or AA
211        """
212        return self._aln_type

define the aln type: RNA, DNA or AA

length: int
215    @property
216    def length(self) -> int:
217        return len(next(iter(self.alignment.values())))
alignment: dict
219    @property
220    def alignment(self) -> dict:
221        """
222        (zoomed) version of the alignment.
223        """
224        if self.zoom is not None:
225            zoomed_aln = dict()
226            for seq in self._alignment:
227                zoomed_aln[seq] = self._alignment[seq][self.zoom[0]:self.zoom[1]]
228            return zoomed_aln
229        else:
230            return self._alignment

(zoomed) version of the alignment.

def get_reference_coords(self) -> tuple[int, int]:
233    def get_reference_coords(self) -> tuple[int, int]:
234        """
235        Determine the start and end coordinates of the reference sequence
236        defined as the first/last nucleotide in the reference sequence
237        (excluding N and gaps).
238
239        :return: start, end
240        """
241        start, end = 0, self.length
242
243        if self.reference_id is None:
244            return start, end
245        else:
246            # 5' --> 3'
247            for start in range(self.length):
248                if self.alignment[self.reference_id][start] not in ['-', 'N']:
249                    break
250            # 3' --> 5'
251            for end in range(self.length - 1, 0, -1):
252                if self.alignment[self.reference_id][end] not in ['-', 'N']:
253                    break
254
255            return start, end

Determine the start and end coordinates of the reference sequence defined as the first/last nucleotide in the reference sequence (excluding N and gaps).

Returns

start, end

def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str:
257    def get_consensus(self, threshold: float = None, use_ambig_nt: bool = False) -> str:
258        """
259        Creates a non-gapped consensus sequence.
260
261        :param threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes
262            the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments)
263            or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold.
264        :param use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position
265            has a frequency above the defined threshold.
266        :return: consensus sequence
267        """
268
269        # helper functions
270        def determine_counts(alignment_dict: dict, position: int) -> dict:
271            """
272            count the number of each char at
273            an idx of the alignment. return sorted dic.
274            handles ambiguous nucleotides in sequences.
275            also handles gaps.
276            """
277            nucleotide_list = []
278
279            # get all nucleotides
280            for sequence in alignment_dict.items():
281                nucleotide_list.append(sequence[1][position])
282            # count occurences of nucleotides
283            counter = dict(collections.Counter(nucleotide_list))
284            # get permutations of an ambiguous nucleotide
285            to_delete = []
286            temp_dict = {}
287            for nucleotide in counter:
288                if nucleotide in config.AMBIG_CHARS[self.aln_type]:
289                    to_delete.append(nucleotide)
290                    permutations = config.AMBIG_CHARS[self.aln_type][nucleotide]
291                    adjusted_freq = 1 / len(permutations)
292                    for permutation in permutations:
293                        if permutation in temp_dict:
294                            temp_dict[permutation] += adjusted_freq
295                        else:
296                            temp_dict[permutation] = adjusted_freq
297
298            # drop ambiguous entries and add adjusted freqs to
299            if to_delete:
300                for i in to_delete:
301                    counter.pop(i)
302                for nucleotide in temp_dict:
303                    if nucleotide in counter:
304                        counter[nucleotide] += temp_dict[nucleotide]
305                    else:
306                        counter[nucleotide] = temp_dict[nucleotide]
307
308            return dict(sorted(counter.items(), key=lambda x: x[1], reverse=True))
309
310        def get_consensus_char(counts: dict, cutoff: float) -> list:
311            """
312            get a list of nucleotides for the consensus seq
313            """
314            n = 0
315
316            consensus_chars = []
317            for char in counts:
318                n += counts[char]
319                consensus_chars.append(char)
320                if n >= cutoff:
321                    break
322
323            return consensus_chars
324
325        def get_ambiguous_char(nucleotides: list) -> str:
326            """
327            get ambiguous char from a list of nucleotides
328            """
329            for ambiguous, permutations in config.AMBIG_CHARS[self.aln_type].items():
330                if set(permutations) == set(nucleotides):
331                    break
332
333            return ambiguous
334
335        # check if params have been set correctly
336        if threshold is not None:
337            if threshold < 0 or threshold > 1:
338                raise ValueError('Threshold must be between 0 and 1.')
339        if self.aln_type == 'AA' and use_ambig_nt:
340            raise ValueError('Ambiguous characters can not be calculated for amino acid alignments.')
341        if threshold is None and use_ambig_nt:
342            raise ValueError('To calculate ambiguous nucleotides, set a threshold > 0.')
343
344        alignment = self.alignment
345        consensus = str()
346
347        if threshold is not None:
348            consensus_cutoff = len(alignment) * threshold
349        else:
350            consensus_cutoff = 0
351
352        # built consensus sequences
353        for idx in range(self.length):
354            char_counts = determine_counts(alignment, idx)
355            consensus_chars = get_consensus_char(
356                char_counts,
357                consensus_cutoff
358            )
359            if threshold != 0:
360                if len(consensus_chars) > 1:
361                    if use_ambig_nt:
362                        char = get_ambiguous_char(consensus_chars)
363                    else:
364                        if self.aln_type == 'AA':
365                            char = 'X'
366                        else:
367                            char = 'N'
368                    consensus = consensus + char
369                else:
370                    consensus = consensus + consensus_chars[0]
371            else:
372                consensus = consensus + consensus_chars[0]
373
374        return consensus

Creates a non-gapped consensus sequence.

Parameters
  • threshold: Threshold for consensus sequence. If use_ambig_nt = True the ambig. char that encodes the nucleotides that reach a cumulative frequency >= threshold is used. Otherwise 'N' (for nt alignments) or 'X' (for as alignments) is used if none of the characters reach a cumulative frequency >= threshold.
  • use_ambig_nt: Use ambiguous character nt if none of the possible nt at a alignment position has a frequency above the defined threshold.
Returns

consensus sequence

def get_conserved_orfs( self, min_length: int = 100, identity_cutoff: float | None = None) -> dict:
376    def get_conserved_orfs(self, min_length: int = 100, identity_cutoff: float | None = None) -> dict:
377        """
378        **conserved ORF definition:**
379            - conserved starts and stops
380            - start, stop must be on the same frame
381            - stop - start must be at least min_length
382            - all ungapped seqs[start:stop] must have at least min_length
383            - no ungapped seq can have a Stop in between Start Stop
384
385        Conservation is measured by number of positions with identical characters divided by
386        orf slice of the alignment.
387
388        **Algorithm overview:**
389            - check for conserved start and stop codons
390            - iterate over all three frames
391            - check each start and next sufficiently far away stop codon
392            - check if all ungapped seqs between start and stop codon are >= min_length
393            - check if no ungapped seq in the alignment has a stop codon
394            - write to dictionary
395            - classify as internal if the stop codon has already been written with a prior start
396            - repeat for reverse complement
397
398        :return: ORF positions and internal ORF positions
399        """
400
401        # helper functions
402        def determine_conserved_start_stops(alignment: dict, alignment_length: int) -> tuple:
403            """
404            Determine all start and stop codons within an alignment.
405            :param alignment: alignment
406            :param alignment_length: length of alignment
407            :return: start and stop codon positions
408            """
409            starts = config.START_CODONS[self.aln_type]
410            stops = config.STOP_CODONS[self.aln_type]
411
412            list_of_starts, list_of_stops = [], []
413            ref = alignment[list(alignment.keys())[0]]
414            for nt_position in range(alignment_length):
415                if ref[nt_position:nt_position + 3] in starts:
416                    conserved_start = True
417                    for sequence in alignment:
418                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in starts:
419                            conserved_start = False
420                            break
421                    if conserved_start:
422                        list_of_starts.append(nt_position)
423
424                if ref[nt_position:nt_position + 3] in stops:
425                    conserved_stop = True
426                    for sequence in alignment:
427                        if not alignment[sequence][nt_position:].replace('-', '')[0:3] in stops:
428                            conserved_stop = False
429                            break
430                    if conserved_stop:
431                        list_of_stops.append(nt_position)
432
433            return list_of_starts, list_of_stops
434
435        def get_ungapped_sliced_seqs(alignment: dict, start_pos: int, stop_pos: int) -> list:
436            """
437            get ungapped sequences starting and stop codons and eliminate gaps
438            :param alignment: alignment
439            :param start_pos: start codon
440            :param stop_pos: stop codon
441            :return: sliced sequences
442            """
443            ungapped_seqs = []
444            for seq_id in alignment:
445                ungapped_seqs.append(alignment[seq_id][start_pos:stop_pos + 3].replace('-', ''))
446
447            return ungapped_seqs
448
449        def additional_stops(ungapped_seqs: list) -> bool:
450            """
451            Checks for the presence of a stop codon
452            :param ungapped_seqs: list of ungapped sequences
453            :return: Additional stop codons (True/False)
454            """
455            stops = config.STOP_CODONS[self.aln_type]
456
457            for sliced_seq in ungapped_seqs:
458                for position in range(0, len(sliced_seq) - 3, 3):
459                    if sliced_seq[position:position + 3] in stops:
460                        return True
461            return False
462
463        def calculate_identity(identity_matrix: ndarray, aln_slice:list) -> float:
464            sliced_array = identity_matrix[:,aln_slice[0]:aln_slice[1]] + 1  # identical = 0, different = -1 --> add 1
465            return np.sum(np.all(sliced_array == 1, axis=0))/(aln_slice[1] - aln_slice[0]) * 100
466
467        # checks for arguments
468        if self.aln_type == 'AA':
469            raise TypeError('ORF search only for RNA/DNA alignments')
470
471        if identity_cutoff is not None:
472            if identity_cutoff > 100 or identity_cutoff < 0:
473                raise ValueError('conservation cutoff must be between 0 and 100')
474
475        if min_length <= 6 or min_length > self.length:
476            raise ValueError(f'min_length must be between 6 and {self.length}')
477
478        # ini
479        identities = self.calc_identity_alignment()
480        alignments = [self.alignment, self.calc_reverse_complement_alignment()]
481        aln_len = self.length
482
483        orf_counter = 0
484        orf_dict = {}
485
486        for aln, direction in zip(alignments, ['+', '-']):
487            # check for starts and stops in the first seq and then check if these are present in all seqs
488            conserved_starts, conserved_stops = determine_conserved_start_stops(aln, aln_len)
489            # check each frame
490            for frame in (0, 1, 2):
491                potential_starts = [x for x in conserved_starts if x % 3 == frame]
492                potential_stops = [x for x in conserved_stops if x % 3 == frame]
493                last_stop = -1
494                for start in potential_starts:
495                    # go to the next stop that is sufficiently far away in the alignment
496                    next_stops = [x for x in potential_stops if x + 3 >= start + min_length]
497                    if not next_stops:
498                        continue
499                    next_stop = next_stops[0]
500                    ungapped_sliced_seqs = get_ungapped_sliced_seqs(aln, start, next_stop)
501                    # re-check the lengths of all ungapped seqs
502                    ungapped_seq_lengths = [len(x) >= min_length for x in ungapped_sliced_seqs]
503                    if not all(ungapped_seq_lengths):
504                        continue
505                    # if no stop codon between start and stop --> write to dictionary
506                    if not additional_stops(ungapped_sliced_seqs):
507                        if direction == '+':
508                            positions = [start, next_stop + 3]
509                        else:
510                            positions = [aln_len - next_stop - 3, aln_len - start]
511                        if last_stop != next_stop:
512                            last_stop = next_stop
513                            conservation = calculate_identity(identities, positions)
514                            if identity_cutoff is not None and conservation < identity_cutoff:
515                                continue
516                            orf_dict[f'ORF_{orf_counter}'] = {'location': [positions],
517                                                              'frame': frame,
518                                                              'strand': direction,
519                                                              'conservation': conservation,
520                                                              'internal': []
521                                                              }
522                            orf_counter += 1
523                        else:
524                            if orf_dict:
525                                orf_dict[f'ORF_{orf_counter - 1}']['internal'].append(positions)
526
527        return orf_dict

conserved ORF definition: - conserved starts and stops - start, stop must be on the same frame - stop - start must be at least min_length - all ungapped seqs[start:stop] must have at least min_length - no ungapped seq can have a Stop in between Start Stop

Conservation is measured by number of positions with identical characters divided by orf slice of the alignment.

Algorithm overview: - check for conserved start and stop codons - iterate over all three frames - check each start and next sufficiently far away stop codon - check if all ungapped seqs between start and stop codon are >= min_length - check if no ungapped seq in the alignment has a stop codon - write to dictionary - classify as internal if the stop codon has already been written with a prior start - repeat for reverse complement

Returns

ORF positions and internal ORF positions

def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff: float = None) -> dict:
529    def get_non_overlapping_conserved_orfs(self, min_length: int = 100, identity_cutoff:float = None) -> dict:
530        """
531        First calculates all ORFs and then searches from 5'
532        all non-overlapping orfs in the fw strand and from the
533        3' all non-overlapping orfs in th rw strand.
534
535        **No overlap algorithm:**
536            **frame 1:** -[M------*]--- ----[M--*]---------[M-----
537
538            **frame 2:** -------[M------*]---------[M---*]--------
539
540            **frame 3:** [M---*]-----[M----------*]----------[M---
541
542            **results:** [M---*][M------*]--[M--*]-[M---*]-[M-----
543
544            frame:    3      2           1      2       1
545
546        :return: dictionary with non-overlapping orfs
547        """
548        orf_dict = self.get_conserved_orfs(min_length, identity_cutoff)
549
550        fw_orfs, rw_orfs = [], []
551
552        for orf in orf_dict:
553            if orf_dict[orf]['strand'] == '+':
554                fw_orfs.append((orf, orf_dict[orf]['location'][0]))
555            else:
556                rw_orfs.append((orf, orf_dict[orf]['location'][0]))
557
558        fw_orfs.sort(key=lambda x: x[1][0])  # sort by start pos
559        rw_orfs.sort(key=lambda x: x[1][1], reverse=True)  # sort by stop pos
560        non_overlapping_orfs = []
561        for orf_list, strand in zip([fw_orfs, rw_orfs], ['+', '-']):
562            previous_stop = -1 if strand == '+' else self.length + 1
563            for orf in orf_list:
564                if strand == '+' and orf[1][0] > previous_stop:
565                    non_overlapping_orfs.append(orf[0])
566                    previous_stop = orf[1][1]
567                elif strand == '-' and orf[1][1] < previous_stop:
568                    non_overlapping_orfs.append(orf[0])
569                    previous_stop = orf[1][0]
570
571        non_overlap_dict = {}
572        for orf in orf_dict:
573            if orf in non_overlapping_orfs:
574                non_overlap_dict[orf] = orf_dict[orf]
575
576        return non_overlap_dict

First calculates all ORFs and then searches from 5' all non-overlapping orfs in the fw strand and from the 3' all non-overlapping orfs in th rw strand.

No overlap algorithm: frame 1: -[M------]--- ----[M--]---------[M-----

**frame 2:** -------[M------*]---------[M---*]--------

**frame 3:** [M---*]-----[M----------*]----------[M---

**results:** [M---*][M------*]--[M--*]-[M---*]-[M-----

frame:    3      2           1      2       1
Returns

dictionary with non-overlapping orfs

def calc_length_stats(self) -> dict:
578    def calc_length_stats(self) -> dict:
579        """
580        Determine the stats for the length of the ungapped seqs in the alignment.
581        :return: dictionary with length stats
582        """
583
584        seq_lengths = [len(self.alignment[x].replace('-', '')) for x in self.alignment]
585
586        return {'number of seq': len(self.alignment),
587                'mean length': float(np.mean(seq_lengths)),
588                'std length': float(np.std(seq_lengths)),
589                'min length': int(np.min(seq_lengths)),
590                'max length': int(np.max(seq_lengths))
591                }

Determine the stats for the length of the ungapped seqs in the alignment.

Returns

dictionary with length stats

def calc_entropy(self) -> list:
593    def calc_entropy(self) -> list:
594        """
595        Calculate the normalized shannon's entropy for every position in an alignment:
596
597        - 1: high entropy
598        - 0: low entropy
599
600        :return: Entropies at each position.
601        """
602
603        # helper functions
604        def shannons_entropy(character_list: list, states: int, aln_type: str) -> float:
605            """
606            Calculate the shannon's entropy of a sequence and
607            normalized between 0 and 1.
608            :param character_list: characters at an alignment position
609            :param states: number of potential characters that can be present
610            :param aln_type: type of the alignment
611            :returns: entropy
612            """
613            ent, n_chars = 0, len(character_list)
614            # only one char is in the list
615            if n_chars <= 1:
616                return ent
617            # calculate the number of unique chars and their counts
618            chars, char_counts = np.unique(character_list, return_counts=True)
619            char_counts = char_counts.astype(float)
620            # ignore gaps for entropy calc
621            char_counts, chars = char_counts[chars != "-"], chars[chars != "-"]
622            # correctly handle ambiguous chars
623            index_to_drop = []
624            for index, char in enumerate(chars):
625                if char in config.AMBIG_CHARS[aln_type]:
626                    index_to_drop.append(index)
627                    amb_chars, amb_counts = np.unique(config.AMBIG_CHARS[aln_type][char], return_counts=True)
628                    amb_counts = amb_counts / len(config.AMBIG_CHARS[aln_type][char])
629                    # add the proportionate numbers to initial array
630                    for amb_char, amb_count in zip(amb_chars, amb_counts):
631                        if amb_char in chars:
632                            char_counts[chars == amb_char] += amb_count
633                        else:
634                            chars, char_counts = np.append(chars, amb_char), np.append(char_counts, amb_count)
635            # drop the ambiguous characters from array
636            char_counts, chars = np.delete(char_counts, index_to_drop), np.delete(chars, index_to_drop)
637            # calc the entropy
638            probs = char_counts / n_chars
639            if np.count_nonzero(probs) <= 1:
640                return ent
641            for prob in probs:
642                ent -= prob * math.log(prob, states)
643
644            return ent
645
646        aln = self.alignment
647        entropys = []
648
649        if self.aln_type == 'AA':
650            states = 20
651        else:
652            states = 4
653        # iterate over alignment positions and the sequences
654        for nuc_pos in range(self.length):
655            pos = []
656            for record in aln:
657                pos.append(aln[record][nuc_pos])
658            entropys.append(shannons_entropy(pos, states, self.aln_type))
659
660        return entropys

Calculate the normalized shannon's entropy for every position in an alignment:

  • 1: high entropy
  • 0: low entropy
Returns

Entropies at each position.

def calc_gc(self) -> list | TypeError:
662    def calc_gc(self) -> list | TypeError:
663        """
664        Determine the GC content for every position in an nt alignment.
665        :return: GC content for every position.
666        :raises: TypeError for AA alignments
667        """
668        if self.aln_type == 'AA':
669            raise TypeError("GC computation is not possible for aminoacid alignment")
670
671        gc, aln, amb_nucs = [], self.alignment, config.AMBIG_CHARS[self.aln_type]
672
673        for position in range(self.length):
674            nucleotides = str()
675            for record in aln:
676                nucleotides = nucleotides + aln[record][position]
677            # ini dict with chars that occur and which ones to
678            # count in which freq
679            to_count = {
680                'G': 1,
681                'C': 1,
682            }
683            # handle ambig. nuc
684            for char in amb_nucs:
685                if char in nucleotides:
686                    to_count[char] = (amb_nucs[char].count('C') + amb_nucs[char].count('G')) / len(amb_nucs[char])
687
688            gc.append(
689                sum([nucleotides.count(x) * to_count[x] for x in to_count]) / len(nucleotides)
690            )
691
692        return gc

Determine the GC content for every position in an nt alignment.

Returns

GC content for every position.

Raises
  • TypeError for AA alignments
def calc_coverage(self) -> list:
694    def calc_coverage(self) -> list:
695        """
696        Determine the coverage of every position in an alignment.
697        This is defined as:
698            1 - cumulative length of '-' characters
699
700        :return: Coverage at each alignment position.
701        """
702        coverage, aln = [], self.alignment
703
704        for nuc_pos in range(self.length):
705            pos = str()
706            for record in aln.keys():
707                pos = pos + aln[record][nuc_pos]
708            coverage.append(1 - pos.count('-') / len(pos))
709
710        return coverage

Determine the coverage of every position in an alignment. This is defined as: 1 - cumulative length of '-' characters

Returns

Coverage at each alignment position.

def calc_reverse_complement_alignment(self) -> dict | TypeError:
712    def calc_reverse_complement_alignment(self) -> dict | TypeError:
713        """
714        Reverse complement the alignment.
715        :return: Alignment (rv)
716        """
717        if self.aln_type == 'AA':
718            raise TypeError('Reverse complement only for RNA or DNA.')
719
720        aln = self.alignment
721        reverse_complement_dict = {}
722
723        for seq_id in aln:
724            reverse_complement_dict[seq_id] = ''.join(config.COMPLEMENT[base] for base in reversed(aln[seq_id]))
725
726        return reverse_complement_dict

Reverse complement the alignment.

Returns

Alignment (rv)

def calc_numerical_alignment(self, encode_mask: bool = False, encode_ambiguities: bool = False):
728    def calc_numerical_alignment(self, encode_mask:bool=False, encode_ambiguities:bool=False):
729        """
730        Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the
731        remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard'].
732
733        :param encode_ambiguities: encode ambiguities as -2
734        :param encode_mask: encode mask with as -3
735        :returns matrix
736        """
737
738        aln = self.alignment
739        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
740        # ini matrix
741        numerical_matrix = np.full(sequences.shape, np.nan, dtype=float)
742        # first encode mask
743        if encode_mask:
744            if self.aln_type == 'AA':
745                is_n_or_x = np.isin(sequences, ['X'])
746            else:
747                is_n_or_x = np.isin(sequences, ['N'])
748            numerical_matrix[is_n_or_x] = -2
749        # next encode ambig chars
750        if encode_ambiguities:
751            numerical_matrix[np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])] = -3
752        # next convert each char into their respective values
753        for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
754            numerical_matrix[np.isin(sequences, [char])] = idx + 1
755
756        return numerical_matrix

Transforms the alignment to numerical values. Ambiguities are encoded as -3, mask as -2 and the remaining chars with the idx + 1 of config.CHAR_COLORS[self.aln_type]['standard'].

Parameters
  • encode_ambiguities: encode ambiguities as -2
  • encode_mask: encode mask with as -3 :returns matrix
def calc_identity_alignment( self, encode_mismatches: bool = True, encode_mask: bool = False, encode_gaps: bool = True, encode_ambiguities: bool = False, encode_each_mismatch_char: bool = False) -> numpy.ndarray:
758    def calc_identity_alignment(self, encode_mismatches:bool=True, encode_mask:bool=False, encode_gaps:bool=True, encode_ambiguities:bool=False, encode_each_mismatch_char:bool=False) -> np.ndarray:
759        """
760        Converts alignment to identity array (identical=0) compared to majority consensus or reference:\n
761
762        :param encode_mismatches: encode mismatch as -1
763        :param encode_mask: encode mask with value=-2 --> also in the reference
764        :param encode_gaps: encode gaps with np.nan --> also in the reference
765        :param encode_ambiguities: encode ambiguities with value=-3
766        :param encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard']
767        :return: identity alignment
768        """
769
770        aln = self.alignment
771        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
772
773        # convert alignment to array
774        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
775        reference = np.array(list(ref))
776        # ini matrix
777        identity_matrix = np.full(sequences.shape, 0, dtype=float)
778
779        is_identical = sequences == reference
780
781        if encode_gaps:
782            is_gap = sequences == '-'
783        else:
784            is_gap = np.full(sequences.shape, False)
785
786        if encode_mask:
787            if self.aln_type == 'AA':
788                is_n_or_x = np.isin(sequences, ['X'])
789            else:
790                is_n_or_x = np.isin(sequences, ['N'])
791        else:
792            is_n_or_x = np.full(sequences.shape, False)
793
794        if encode_ambiguities:
795            is_ambig = np.isin(sequences, [key for key in config.AMBIG_CHARS[self.aln_type] if key not in ['N', 'X', '-']])
796        else:
797            is_ambig = np.full(sequences.shape, False)
798
799        if encode_mismatches:
800            is_mismatch = ~is_gap & ~is_identical & ~is_n_or_x & ~is_ambig
801        else:
802            is_mismatch = np.full(sequences.shape, False)
803
804        # encode every different character
805        if encode_each_mismatch_char:
806            for idx, char in enumerate(config.CHAR_COLORS[self.aln_type]['standard']):
807                new_encoding = np.isin(sequences, [char]) & is_mismatch
808                identity_matrix[new_encoding] = idx + 1
809        # or encode different with a single value
810        else:
811            identity_matrix[is_mismatch] = -1  # mismatch
812
813        identity_matrix[is_gap] = np.nan  # gap
814        identity_matrix[is_n_or_x] = -2  # 'N' or 'X'
815        identity_matrix[is_ambig] = -3  # ambiguities
816
817        return identity_matrix

Converts alignment to identity array (identical=0) compared to majority consensus or reference:

Parameters
  • encode_mismatches: encode mismatch as -1
  • encode_mask: encode mask with value=-2 --> also in the reference
  • encode_gaps: encode gaps with np.nan --> also in the reference
  • encode_ambiguities: encode ambiguities with value=-3
  • encode_each_mismatch_char: for each mismatch encode characters separately - these values represent the idx+1 values of config.CHAR_COLORS[self.aln_type]['standard']
Returns

identity alignment

def calc_similarity_alignment( self, matrix_type: str | None = None, normalize: bool = True) -> numpy.ndarray:
819    def calc_similarity_alignment(self, matrix_type:str|None=None, normalize:bool=True) -> np.ndarray:
820        """
821        Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight
822        differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the
823        reference residue at each column. Gaps are encoded as np.nan.
824
825        The calculation follows these steps:
826
827        1. **Reference Sequence**: If a reference sequence is provided (via `self.reference_id`), it is used. Otherwise,
828           a consensus sequence is generated to serve as the reference.
829        2. **Substitution Matrix**: The similarity between residues is determined using a substitution matrix, such as
830           BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type.
831        3. **Per-Column Normalization (optional)**:
832
833        For each column in the alignment:
834            - The residue in the reference sequence is treated as the baseline for that column.
835            - The substitution scores for the reference residue are extracted from the substitution matrix.
836            - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue.
837            - This ensures that identical residues (or those with high similarity to the reference) have high scores,
838            while more dissimilar residues have lower scores.
839        4. **Output**:
840
841           - The normalized similarity scores are stored in a NumPy array.
842           - Gaps (if any) or residues not present in the substitution matrix are encoded as `np.nan`.
843
844        :param: matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN)
845        :param: normalize: whether to normalize the similarity scores to range [0, 1]
846        :return: A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue
847            and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity).
848            Gaps and invalid residues are encoded as `np.nan`.
849        :raise: ValueError
850            If the specified substitution matrix is not available for the given alignment type.
851        """
852
853        aln = self.alignment
854        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
855        if matrix_type is None:
856            if self.aln_type == 'AA':
857                matrix_type = 'BLOSUM65'
858            else:
859                matrix_type = 'TRANS'
860        # load substitution matrix as dictionary
861        try:
862            subs_matrix = config.SUBS_MATRICES[self.aln_type][matrix_type]
863        except KeyError:
864            raise ValueError(
865                f'The specified matrix does not exist for alignment type.\nAvailable matrices for {self.aln_type} are:\n{list(config.SUBS_MATRICES[self.aln_type].keys())}'
866            )
867
868        # set dtype and convert alignment to a NumPy array for vectorized processing
869        dtype = np.dtype(float, metadata={'matrix': matrix_type})
870        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
871        reference = np.array(list(ref))
872        valid_chars = list(subs_matrix.keys())
873        similarity_array = np.full(sequences.shape, np.nan, dtype=dtype)
874
875        for j, ref_char in enumerate(reference):
876            if ref_char not in valid_chars + ['-']:
877                continue
878            # Get local min and max for the reference residue
879            if normalize and ref_char != '-':
880                local_scores = subs_matrix[ref_char].values()
881                local_min, local_max = min(local_scores), max(local_scores)
882
883            for i, char in enumerate(sequences[:, j]):
884                if char not in valid_chars:
885                    continue
886                # classify the similarity as max if the reference has a gap
887                similarity_score = subs_matrix[char][ref_char] if ref_char != '-' else 1
888                similarity_array[i, j] = (similarity_score - local_min) / (local_max - local_min) if normalize and ref_char != '-' else similarity_score
889
890        return similarity_array

Calculate the similarity score between the alignment and the reference sequence, with normalization to highlight differences. The similarity scores are scaled to the range [0, 1] based on the substitution matrix values for the reference residue at each column. Gaps are encoded as np.nan.

The calculation follows these steps:

  1. Reference Sequence: If a reference sequence is provided (via self.reference_id), it is used. Otherwise, a consensus sequence is generated to serve as the reference.
  2. Substitution Matrix: The similarity between residues is determined using a substitution matrix, such as BLOSUM65 for amino acids or BLASTN for nucleotides. The matrix is loaded based on the alignment type.
  3. Per-Column Normalization (optional):

For each column in the alignment: - The residue in the reference sequence is treated as the baseline for that column. - The substitution scores for the reference residue are extracted from the substitution matrix. - The scores are normalized to the range [0, 1] using the minimum and maximum possible scores for the reference residue. - This ensures that identical residues (or those with high similarity to the reference) have high scores, while more dissimilar residues have lower scores. 4. Output:

  • The normalized similarity scores are stored in a NumPy array.
  • Gaps (if any) or residues not present in the substitution matrix are encoded as np.nan.
Parameters
  • matrix_type: type of similarity score (if not set - AA: BLOSSUM65, RNA/DNA: BLASTN)
  • normalize: whether to normalize the similarity scores to range [0, 1]
Returns

A 2D NumPy array where each entry corresponds to the normalized similarity score between the aligned residue and the reference residue for that column. Values range from 0 (low similarity) to 1 (high similarity). Gaps and invalid residues are encoded as np.nan. :raise: ValueError If the specified substitution matrix is not available for the given alignment type.

def calc_position_matrix(self, matrix_type: str = 'PWM') -> numpy.ndarray | ValueError:
892    def calc_position_matrix(self, matrix_type:str='PWM') -> np.ndarray | ValueError:
893        """
894        Calculates a position matrix of the specified type for the given alignment. The function
895        supports generating matrices of types Position Frequency Matrix (PFM), Position Probability
896        Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates
897        the provided matrix type and includes pseudo-count adjustments to ensure robust calculations.
898
899        :param matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM',
900            'PWM', and 'IC'. Defaults to 'PWM'.
901        :type matrix_type: str
902        :raises ValueError: If the provided `matrix_type` is not one of the accepted values.
903        :return: A numpy array representing the calculated position matrix of the specified type.
904        :rtype: np.ndarray
905        """
906
907        # ini
908        aln = self.alignment
909        if matrix_type not in ['PFM', 'PPM', 'IC', 'PWM']:
910            raise ValueError('Matrix_type must be PFM, PPM, IC or PWM.')
911        possible_chars = list(config.CHAR_COLORS[self.aln_type]['standard'].keys())
912        sequences = np.array([list(aln[seq_id]) for seq_id in list(aln.keys())])
913
914        # calc position frequency matrix
915        pfm = np.array([np.sum(sequences == char, 0) for char in possible_chars])
916        if matrix_type == 'PFM':
917            return pfm
918
919        # calc position probability matrix (probability)
920        pseudo_count = 0.0001  # to avoid 0 values
921        pfm = pfm + pseudo_count
922        ppm_non_char_excluded = pfm/np.sum(pfm, axis=0)  # use this for pwm/ic calculation
923        ppm = pfm/len(aln.keys())  # calculate the frequency based on row number
924        if matrix_type == 'PPM':
925            return ppm
926
927        # calc position weight matrix (log-likelihood)
928        pwm = np.log2(ppm_non_char_excluded * len(possible_chars))
929        if matrix_type == 'PWM':
930            return pwm
931
932        # calc information content per position (in bits) - can be used to scale a ppm for sequence logos
933        ic = np.sum(ppm_non_char_excluded * pwm, axis=0)
934        if matrix_type == 'IC':
935            return ic
936
937        return None

Calculates a position matrix of the specified type for the given alignment. The function supports generating matrices of types Position Frequency Matrix (PFM), Position Probability Matrix (PPM), Position Weight Matrix (PWM), and cummulative Information Content (IC). It validates the provided matrix type and includes pseudo-count adjustments to ensure robust calculations.

Parameters
  • matrix_type: Type of position matrix to calculate. Accepted values are 'PFM', 'PPM', 'PWM', and 'IC'. Defaults to 'PWM'.
Raises
  • ValueError: If the provided matrix_type is not one of the accepted values.
Returns

A numpy array representing the calculated position matrix of the specified type.

def calc_percent_recovery(self) -> dict:
939    def calc_percent_recovery(self) -> dict:
940        """
941        Recovery per sequence either compared to the majority consensus seq
942        or the reference seq.\n
943        Defined as:\n
944
945        `(1 - sum(N/X/- characters in ungapped ref regions))*100`
946
947        This is highly similar to how nextclade calculates recovery over reference.
948
949        :return: dict
950        """
951
952        aln = self.alignment
953
954        if self.reference_id is not None:
955            ref = aln[self.reference_id]
956        else:
957            ref = self.get_consensus()  # majority consensus
958
959        if not any(char != '-' for char in ref):
960            raise ValueError("Reference sequence is entirely gapped, cannot calculate recovery.")
961
962
963        # count 'N', 'X' and '-' chars in non-gapped regions
964        recovery_over_ref = dict()
965
966        # Get positions of non-gap characters in the reference
967        non_gap_positions = [i for i, char in enumerate(ref) if char != '-']
968        cumulative_length = len(non_gap_positions)
969
970        # Calculate recovery
971        for seq_id in aln:
972            if seq_id == self.reference_id:
973                continue
974            seq = aln[seq_id]
975            count_invalid = sum(
976                seq[pos] == '-' or
977                (seq[pos] == 'X' if self.aln_type == "AA" else seq[pos] == 'N')
978                for pos in non_gap_positions
979            )
980            recovery_over_ref[seq_id] = (1 - count_invalid / cumulative_length) * 100
981
982        return recovery_over_ref

Recovery per sequence either compared to the majority consensus seq or the reference seq.

Defined as:

(1 - sum(N/X/- characters in ungapped ref regions))*100

This is highly similar to how nextclade calculates recovery over reference.

Returns

dict

def calc_character_frequencies(self) -> dict:
 984    def calc_character_frequencies(self) -> dict:
 985        """
 986        Calculate the percentage characters in the alignment:
 987        The frequencies are counted by seq and in total. The
 988        percentage of non-gap characters in the alignment is
 989        relative to the total number of non-gap characters.
 990        The gap percentage is relative to the sequence length.
 991
 992        The output is a nested dictionary.
 993
 994        :return: Character frequencies
 995        """
 996
 997        aln, aln_length = self.alignment, self.length
 998
 999        freqs = {'total': {'-': {'counts': 0, '% of alignment': float()}}}
1000
1001        for seq_id in aln:
1002            freqs[seq_id], all_chars = {'-': {'counts': 0, '% of alignment': float()}}, 0
1003            unique_chars = set(aln[seq_id])
1004            for char in unique_chars:
1005                if char == '-':
1006                    continue
1007                # add characters to dictionaries
1008                if char not in freqs[seq_id]:
1009                    freqs[seq_id][char] = {'counts': 0, '% of non-gapped': 0}
1010                if char not in freqs['total']:
1011                    freqs['total'][char] = {'counts': 0, '% of non-gapped': 0}
1012                # count non-gap chars
1013                freqs[seq_id][char]['counts'] += aln[seq_id].count(char)
1014                freqs['total'][char]['counts'] += freqs[seq_id][char]['counts']
1015                all_chars += freqs[seq_id][char]['counts']
1016            # normalize counts
1017            for char in freqs[seq_id]:
1018                if char == '-':
1019                    continue
1020                freqs[seq_id][char]['% of non-gapped'] = freqs[seq_id][char]['counts'] / all_chars * 100
1021                freqs['total'][char]['% of non-gapped'] += freqs[seq_id][char]['% of non-gapped']
1022            # count gaps
1023            freqs[seq_id]['-']['counts'] = aln[seq_id].count('-')
1024            freqs['total']['-']['counts'] += freqs[seq_id]['-']['counts']
1025            # normalize gap counts
1026            freqs[seq_id]['-']['% of alignment'] = freqs[seq_id]['-']['counts'] / aln_length * 100
1027            freqs['total']['-']['% of alignment'] += freqs[seq_id]['-']['% of alignment']
1028
1029        # normalize the total counts
1030        for char in freqs['total']:
1031            for value in freqs['total'][char]:
1032                if value == '% of alignment' or value == '% of non-gapped':
1033                    freqs['total'][char][value] = freqs['total'][char][value] / len(aln)
1034
1035        return freqs

Calculate the percentage characters in the alignment: The frequencies are counted by seq and in total. The percentage of non-gap characters in the alignment is relative to the total number of non-gap characters. The gap percentage is relative to the sequence length.

The output is a nested dictionary.

Returns

Character frequencies

def calc_pairwise_identity_matrix(self, distance_type: str = 'ghd') -> numpy.ndarray:
1037    def calc_pairwise_identity_matrix(self, distance_type:str='ghd') -> ndarray:
1038        """
1039        Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented:
1040
1041        **1) ghd (global hamming distance)**: At each alignment position, check if characters match:
1042        \ndistance = matches / alignment_length * 100
1043
1044        **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps:
1045        \ndistance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100
1046
1047        **3) ged (gap excluded distance)**: All gaps are excluded from the alignment
1048        \ndistance = matches / (matches + mismatches) * 100
1049
1050        **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch.
1051        \ndistance = matches / gap_compressed_alignment_length * 100
1052
1053        :return: array with pairwise distances.
1054        """
1055
1056        def hamming_distance(seq1: str, seq2: str) -> int:
1057            return sum(c1 == c2 for c1, c2 in zip(seq1, seq2))
1058
1059        def ghd(seq1: str, seq2: str) -> float:
1060            return hamming_distance(seq1, seq2) / self.length * 100
1061
1062        def lhd(seq1, seq2):
1063            # Trim gaps from both sides
1064            i, j = 0, self.length - 1
1065            while i < self.length and (seq1[i] == '-' or seq2[i] == '-'):
1066                i += 1
1067            while j >= 0 and (seq1[j] == '-' or seq2[j] == '-'):
1068                j -= 1
1069            if i > j:
1070                return 0.0
1071
1072            seq1_, seq2_ = seq1[i:j + 1], seq2[i:j + 1]
1073            matches = sum(c1 == c2 for c1, c2 in zip(seq1_, seq2_))
1074            length = j - i + 1
1075            return (matches / length) * 100 if length > 0 else 0.0
1076
1077        def ged(seq1: str, seq2: str) -> float:
1078
1079            matches, mismatches = 0, 0
1080
1081            for c1, c2 in zip(seq1, seq2):
1082                if c1 != '-' and c2 != '-':
1083                    if c1 == c2:
1084                        matches += 1
1085                    else:
1086                        mismatches += 1
1087            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1088
1089        def gcd(seq1: str, seq2: str) -> float:
1090            matches = 0
1091            mismatches = 0
1092            in_gap = False
1093
1094            for char1, char2 in zip(seq1, seq2):
1095                if char1 == '-' and char2 == '-':  # Shared gap: do nothing
1096                    continue
1097                elif char1 == '-' or char2 == '-':  # Gap in only one sequence
1098                    if not in_gap:  # Start of a new gap stretch
1099                        mismatches += 1
1100                        in_gap = True
1101                else:  # No gaps
1102                    in_gap = False
1103                    if char1 == char2:  # Matching characters
1104                        matches += 1
1105                    else:  # Mismatched characters
1106                        mismatches += 1
1107
1108            return matches / (matches + mismatches) * 100 if (matches + mismatches) > 0 else 0
1109
1110
1111        # Map distance type to corresponding function
1112        distance_functions: Dict[str, Callable[[str, str], float]] = {
1113            'ghd': ghd,
1114            'lhd': lhd,
1115            'ged': ged,
1116            'gcd': gcd
1117        }
1118
1119        if distance_type not in distance_functions:
1120            raise ValueError(f"Invalid distance type '{distance_type}'. Choose from {list(distance_functions.keys())}.")
1121
1122        # Compute pairwise distances
1123        aln = self.alignment
1124        distance_func = distance_functions[distance_type]
1125        distance_matrix = np.zeros((len(aln), len(aln)))
1126
1127        sequences = list(aln.values())
1128        n = len(sequences)
1129        for i in range(n):
1130            seq1 = sequences[i]
1131            for j in range(i, n):
1132                seq2 = sequences[j]
1133                dist = distance_func(seq1, seq2)
1134                distance_matrix[i, j] = dist
1135                distance_matrix[j, i] = dist
1136
1137        return distance_matrix

Calculate pairwise identities for an alignment. As there are different definitions of sequence identity, there are different options implemented:

    **1) ghd (global hamming distance)**: At each alignment position, check if characters match:

distance = matches / alignment_length * 100

    **2) lhd (local hamming distance)**: Restrict the alignment to the region in both sequences that do not start and end with gaps:

distance = matches / min(5'3' ungapped seq1, 5'3' ungapped seq2) * 100

    **3) ged (gap excluded distance)**: All gaps are excluded from the alignment

distance = matches / (matches + mismatches) * 100

    **4) gcd (gap compressed distance)**: All consecutive gaps are compressed to one mismatch.

distance = matches / gap_compressed_alignment_length * 100

    :return: array with pairwise distances.
def get_snps(self, include_ambig: bool = False) -> dict:
1139    def get_snps(self, include_ambig:bool=False) -> dict:
1140        """
1141        Calculate snps similar to snp-sites (output is comparable):
1142        https://github.com/sanger-pathogens/snp-sites
1143        Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character.
1144        The SNPs are compared to a majority consensus sequence or to a reference if it has been set.
1145
1146        :param include_ambig: Include ambiguous snps (default: False)
1147        :return: dictionary containing snp positions and their variants including their frequency.
1148        """
1149        aln = self.alignment
1150        ref = aln[self.reference_id] if self.reference_id is not None else self.get_consensus()
1151        aln = {x: aln[x] for x in aln.keys() if x != self.reference_id}
1152        seq_ids = list(aln.keys())
1153        snp_dict = {'#CHROM': self.reference_id if self.reference_id is not None else 'consensus', 'POS': {}}
1154
1155        for pos in range(self.length):
1156            reference_char = ref[pos]
1157            if not include_ambig:
1158                if reference_char in config.AMBIG_CHARS[self.aln_type] and reference_char != '-':
1159                    continue
1160            alt_chars, snps = [], []
1161            for i, seq_id in enumerate(aln.keys()):
1162                alt_chars.append(aln[seq_id][pos])
1163                if reference_char != aln[seq_id][pos]:
1164                    snps.append(i)
1165            if not snps:
1166                continue
1167            if include_ambig:
1168                if all(alt_chars[x] in config.AMBIG_CHARS[self.aln_type] for x in snps):
1169                    continue
1170            else:
1171                snps = [x for x in snps if alt_chars[x] not in config.AMBIG_CHARS[self.aln_type]]
1172                if not snps:
1173                    continue
1174            if pos not in snp_dict:
1175                snp_dict['POS'][pos] = {'ref': reference_char, 'ALT': {}}
1176            for snp in snps:
1177                if alt_chars[snp] not in snp_dict['POS'][pos]['ALT']:
1178                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]] = {
1179                        'AF': 1,
1180                        'SEQ_ID': [seq_ids[snp]]
1181                    }
1182                else:
1183                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['AF'] += 1
1184                    snp_dict['POS'][pos]['ALT'][alt_chars[snp]]['SEQ_ID'].append(seq_ids[snp])
1185            # calculate AF
1186            if pos in snp_dict['POS']:
1187                for alt in snp_dict['POS'][pos]['ALT']:
1188                    snp_dict['POS'][pos]['ALT'][alt]['AF'] /= len(aln)
1189
1190        return snp_dict

Calculate snps similar to snp-sites (output is comparable): https://github.com/sanger-pathogens/snp-sites Importantly, SNPs are only considered if at least one of the snps is not an ambiguous character. The SNPs are compared to a majority consensus sequence or to a reference if it has been set.

Parameters
  • include_ambig: Include ambiguous snps (default: False)
Returns

dictionary containing snp positions and their variants including their frequency.

def calc_transition_transversion_score(self) -> list:
1192    def calc_transition_transversion_score(self) -> list:
1193        """
1194        Based on the snp positions, calculates a transition/transversions score.
1195        A positive score means higher ratio of transitions and negative score means
1196        a higher ratio of transversions.
1197        :return: list
1198        """
1199
1200        if self.aln_type == 'AA':
1201            raise TypeError('TS/TV scoring only for RNA/DNA alignments')
1202
1203        # ini
1204        snps = self.get_snps()
1205        score = [0]*self.length
1206
1207        for pos in snps['POS']:
1208            t_score_temp = 0
1209            for alt in snps['POS'][pos]['ALT']:
1210                # check the type of substitution
1211                if snps['POS'][pos]['ref'] + alt in ['AG', 'GA', 'CT', 'TC', 'CU', 'UC']:
1212                    score[pos] += snps['POS'][pos]['ALT'][alt]['AF']
1213                else:
1214                    score[pos] -= snps['POS'][pos]['ALT'][alt]['AF']
1215
1216        return score

Based on the snp positions, calculates a transition/transversions score. A positive score means higher ratio of transitions and negative score means a higher ratio of transversions.

Returns

list

class Annotation:
1219class Annotation:
1220    """
1221    An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA.
1222    """
1223
1224    def __init__(self, aln: MSA, annotation_path: str):
1225        """
1226        The annotation class. Lets you parse multiple standard formats
1227        which might be used for annotating an alignment. The main purpose
1228        is to parse the annotation file and adapt the locations of diverse
1229        features to the locations within the alignment, considering the
1230        respective alignment positions. Importantly, IDs of the alignment
1231        and the MSA have to partly match.
1232
1233        :param aln: MSA class
1234        :param annotation_path: path to annotation file (gb, bed, gff) or raw string
1235
1236        """
1237
1238        self.ann_type, self._seq_id, self.locus, self.features  = self._parse_annotation(annotation_path, aln)  # read annotation
1239        self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id)  # extract gapped sequence
1240        self._position_map = self._build_position_map()  # build a position map
1241        self._map_to_alignment()  # adapt feature locations
1242
1243    @staticmethod
1244    def _MSA_validation_and_seq_extraction(aln: MSA, seq_id: str) -> str:
1245        """
1246        extract gapped sequence from MSA that corresponds to annotation
1247        :param aln: MSA class
1248        :param seq_id: sequence id to extract
1249        :return: gapped sequence
1250        """
1251        if not isinstance(aln, MSA):
1252            raise ValueError('alignment has to be an MSA class. use explore.MSA() to read in alignment')
1253        else:
1254            return aln._alignment[seq_id]
1255
1256    @staticmethod
1257    def _parse_annotation(annotation_path: str, aln: MSA) -> tuple[str, str, str, Dict]:
1258
1259        def detect_annotation_type(file_path: str) -> str:
1260            """
1261            Detect the type of annotation file (GenBank, GFF, or BED) based
1262            on the first relevant line (excluding empty and #)
1263
1264            :param file_path: Path to the annotation file.
1265            :return: The detected file type ('gb', 'gff', or 'bed').
1266
1267            :raises ValueError: If the file type cannot be determined.
1268            """
1269
1270            with _get_line_iterator(file_path) as file:
1271                for line in file:
1272                    # skip empty lines and comments
1273                    if not line.strip() or line.startswith('#'):
1274                        continue
1275                   # genbank
1276                    if line.startswith('LOCUS'):
1277                        return 'gb'
1278                    # gff
1279                    if len(line.split('\t')) == 9:
1280                        # Check for expected values
1281                        columns = line.split('\t')
1282                        if columns[6] in ['+', '-', '.'] and re.match(r'^\d+$', columns[3]) and re.match(r'^\d+$',columns[4]):
1283                            return 'gff'
1284                    # BED files are tab-delimited with at least 3 fields: chrom, start, end
1285                    fields = line.split('\t')
1286                    if len(fields) >= 3 and re.match(r'^\d+$', fields[1]) and re.match(r'^\d+$', fields[2]):
1287                        return 'bed'
1288                    # only read in the first line
1289                    break
1290
1291            raise ValueError(
1292                "File type could not be determined. Ensure the file follows a recognized format (GenBank, GFF, or BED).")
1293
1294        def parse_gb(file_path) -> dict:
1295            """
1296            parse a genebank file to dictionary - primarily retained are the informations
1297            for qualifiers as these will be used for plotting.
1298
1299            :param file_path: path to genebank file
1300            :return: nested dictionary
1301
1302            """
1303
1304            def sanitize_gb_location(string: str) -> tuple[list, str]:
1305                """
1306                see: https://www.insdc.org/submitting-standards/feature-table/
1307                """
1308                strand = '+'
1309                locations = []
1310                # check the direction of the annotation
1311                if 'complement' in string:
1312                    strand = '-'
1313                # sanitize operators
1314                for operator in ['complement(', 'join(', 'order(']:
1315                    string = string.replace(operator, '')
1316                # sanitize possible chars for splitting start stop -
1317                # however in the future might not simply do this
1318                # as some useful information is retained
1319                for char in ['>', '<', ')']:
1320                    string = string.replace(char, '')
1321                # check if we have multiple location e.g. due to splicing
1322                if ',' in string:
1323                    raw_locations = string.split(',')
1324                else:
1325                    raw_locations = [string]
1326                # try to split start and stop
1327                for location in raw_locations:
1328                    for sep in ['..', '.', '^']:
1329                        if sep in location:
1330                            sanitized_locations = [int(x) for x in location.split(sep)]
1331                            sanitized_locations[0] = sanitized_locations[0] - 1  # enforce 0-based starts
1332                            locations.append(sanitized_locations)
1333                            break
1334
1335                return locations, strand
1336
1337
1338            records = {}
1339            with _get_line_iterator(file_path) as file:
1340                record = None
1341                in_features = False
1342                counter_dict = {}
1343                for line in file:
1344                    line = line.rstrip()
1345                    parts = line.split()
1346                    # extract the locus id
1347                    if line.startswith('LOCUS'):
1348                        if record:
1349                            records[record['locus']] = record
1350                        record = {
1351                            'locus': parts[1],
1352                            'features': {}
1353                        }
1354
1355                    elif line.startswith('FEATURES'):
1356                        in_features = True
1357
1358                    # ignore the sequence info
1359                    elif line.startswith('ORIGIN'):
1360                        in_features = False
1361
1362                    # now write useful feature information to dictionary
1363                    elif in_features:
1364                        if not line.strip():
1365                            continue
1366                        if line[5] != ' ':
1367                            location_line = True  # remember that we are in a location for multi-line locations
1368                            feature_type, qualifier = parts[0], parts[1]
1369                            if feature_type not in record['features']:
1370                                record['features'][feature_type] = {}
1371                                counter_dict[feature_type] = 0
1372                            locations, strand = sanitize_gb_location(qualifier)
1373                            record['features'][feature_type][counter_dict[feature_type]] = {
1374                                'location': locations,
1375                                'strand': strand
1376                            }
1377                            counter_dict[feature_type] += 1
1378                        else:
1379                            # edge case for multi-line locations
1380                            if location_line and not line.strip().startswith('/'):
1381                                locations, strand = sanitize_gb_location(parts[0])
1382                                for loc in locations:
1383                                    record['features'][feature_type][counter_dict[feature_type]]['location'].append(loc)
1384                            else:
1385                                location_line = False
1386                                try:
1387                                    qualifier_type, qualifier = parts[0].split('=')
1388                                except ValueError:  # we are in the coding sequence
1389                                    qualifier = qualifier + parts[0]
1390
1391                                qualifier_type, qualifier = qualifier_type.lstrip('/'), qualifier.strip('"')
1392                                last_index = counter_dict[feature_type] - 1
1393                                record['features'][feature_type][last_index][qualifier_type] = qualifier
1394
1395            records[record['locus']] = record
1396
1397            return records
1398
1399        def parse_gff(file_path) -> dict:
1400            """
1401            Parse a GFF3 (General Feature Format) file into a dictionary structure.
1402
1403            :param file_path: path to genebank file
1404            :return: nested dictionary
1405
1406            """
1407            records = {}
1408            with _get_line_iterator(file_path) as file:
1409                previous_id, previous_feature = None, None
1410                for line in file:
1411                    if line.startswith('#') or not line.strip():
1412                        continue
1413                    parts = line.strip().split('\t')
1414                    seqid, source, feature_type, start, end, score, strand, phase, attributes = parts
1415                    # ensure that region and source features are not named differently for gff and gb
1416                    if feature_type == 'region':
1417                        feature_type = 'source'
1418                    if seqid not in records:
1419                        records[seqid] = {'locus': seqid, 'features': {}}
1420                    if feature_type not in records[seqid]['features']:
1421                        records[seqid]['features'][feature_type] = {}
1422
1423                    feature_id = len(records[seqid]['features'][feature_type])
1424                    feature = {
1425                        'strand': strand,
1426                    }
1427
1428                    # Parse attributes into key-value pairs
1429                    for attr in attributes.split(';'):
1430                        if '=' in attr:
1431                            key, value = attr.split('=', 1)
1432                            feature[key.strip()] = value.strip()
1433
1434                    # check if feature are the same --> possible splicing
1435                    if previous_id is not None and previous_feature == feature:
1436                        records[seqid]['features'][feature_type][previous_id]['location'].append([int(start)-1, int(end)])
1437                    else:
1438                        records[seqid]['features'][feature_type][feature_id] = feature
1439                        records[seqid]['features'][feature_type][feature_id]['location'] = [[int(start) - 1, int(end)]]
1440                    # set new previous id and features -> new dict as 'location' is pointed in current feature and this
1441                    # is the only key different if next feature has the same entries
1442                    previous_id, previous_feature = feature_id, {key:value for key, value in feature.items() if key != 'location'}
1443
1444            return records
1445
1446        def parse_bed(file_path) -> dict:
1447            """
1448            Parse a BED file into a dictionary structure.
1449
1450            :param file_path: path to genebank file
1451            :return: nested dictionary
1452
1453            """
1454            records = {}
1455            with _get_line_iterator(file_path) as file:
1456                for line in file:
1457                    if line.startswith('#') or not line.strip():
1458                        continue
1459                    parts = line.strip().split('\t')
1460                    chrom, start, end, *optional = parts
1461
1462                    if chrom not in records:
1463                        records[chrom] = {'locus': chrom, 'features': {}}
1464                    feature_type = 'region'
1465                    if feature_type not in records[chrom]['features']:
1466                        records[chrom]['features'][feature_type] = {}
1467
1468                    feature_id = len(records[chrom]['features'][feature_type])
1469                    feature = {
1470                        'location': [[int(start), int(end)]],  # BED uses 0-based start, convert to 1-based
1471                        'strand': '+',  # assume '+' if not present
1472                    }
1473
1474                    # Handle optional columns (name, score, strand) --> ignore 7-12
1475                    if len(optional) >= 1:
1476                        feature['name'] = optional[0]
1477                    if len(optional) >= 2:
1478                        feature['score'] = optional[1]
1479                    if len(optional) >= 3:
1480                        feature['strand'] = optional[2]
1481
1482                    records[chrom]['features'][feature_type][feature_id] = feature
1483
1484            return records
1485
1486        parse_functions: Dict[str, Callable[[str], dict]] = {
1487            'gb': parse_gb,
1488            'bed': parse_bed,
1489            'gff': parse_gff,
1490        }
1491        # determine the annotation content -> should be standard formatted
1492        try:
1493            annotation_type = detect_annotation_type(annotation_path)
1494        except ValueError as err:
1495            raise err
1496
1497        # read in the annotation
1498        annotations = parse_functions[annotation_type](annotation_path)
1499
1500        # sanity check whether one of the annotation ids and alignment ids match
1501        annotation_found = False
1502        for annotation in annotations.keys():
1503            for aln_id in aln.alignment.keys():
1504                aln_id_sanitized = aln_id.split(' ')[0]
1505                # check in both directions
1506                if aln_id_sanitized in annotation:
1507                    annotation_found = True
1508                    break
1509                if annotation in aln_id_sanitized:
1510                    annotation_found = True
1511                    break
1512
1513        if not annotation_found:
1514            raise ValueError(f'the annotations of {annotation_path} do not match any ids in the MSA')
1515
1516        # return only the annotation that has been found, the respective type and the seq_id to map to
1517        return annotation_type, aln_id, annotations[annotation]['locus'], annotations[annotation]['features']
1518
1519
1520    def _build_position_map(self) -> Dict[int, int]:
1521        """
1522        build a position map from a sequence.
1523
1524        :return genomic position: gapped position
1525        """
1526
1527        position_map = {}
1528        genomic_pos = 0
1529        for aln_index, char in enumerate(self._gapped_seq):
1530            if char != '-':
1531                position_map[genomic_pos] = aln_index
1532                genomic_pos += 1
1533        # ensure the last genomic position is included
1534        position_map[genomic_pos] = len(self._gapped_seq)
1535
1536        return position_map
1537
1538
1539    def _map_to_alignment(self):
1540        """
1541        Adjust all feature locations to alignment positions
1542        """
1543
1544        def map_location(position_map: Dict[int, int], locations: list) -> list:
1545            """
1546            Map genomic locations to alignment positions using a precomputed position map.
1547
1548            :param position_map: Positions mapped from gapped to ungapped
1549            :param locations: List of genomic start and end positions.
1550            :return: List of adjusted alignment positions.
1551            """
1552
1553            aligned_locs = []
1554            for start, end in locations:
1555                try:
1556                    aligned_start = position_map[start]
1557                    aligned_end = position_map[end]
1558                    aligned_locs.append([aligned_start, aligned_end])
1559                except KeyError:
1560                    raise ValueError(f"Positions {start}-{end} lie outside of the position map.")
1561
1562            return aligned_locs
1563
1564        for feature_type, features in self.features.items():
1565            for feature_id, feature_data in features.items():
1566                original_locations = feature_data['location']
1567                aligned_locations = map_location(self._position_map, original_locations)
1568                feature_data['location'] = aligned_locations

An annotation class that allows to read in gff, gb or bed files and adjust its locations to that of the MSA.

Annotation(aln: MSA, annotation_path: str)
1224    def __init__(self, aln: MSA, annotation_path: str):
1225        """
1226        The annotation class. Lets you parse multiple standard formats
1227        which might be used for annotating an alignment. The main purpose
1228        is to parse the annotation file and adapt the locations of diverse
1229        features to the locations within the alignment, considering the
1230        respective alignment positions. Importantly, IDs of the alignment
1231        and the MSA have to partly match.
1232
1233        :param aln: MSA class
1234        :param annotation_path: path to annotation file (gb, bed, gff) or raw string
1235
1236        """
1237
1238        self.ann_type, self._seq_id, self.locus, self.features  = self._parse_annotation(annotation_path, aln)  # read annotation
1239        self._gapped_seq = self._MSA_validation_and_seq_extraction(aln, self._seq_id)  # extract gapped sequence
1240        self._position_map = self._build_position_map()  # build a position map
1241        self._map_to_alignment()  # adapt feature locations

The annotation class. Lets you parse multiple standard formats which might be used for annotating an alignment. The main purpose is to parse the annotation file and adapt the locations of diverse features to the locations within the alignment, considering the respective alignment positions. Importantly, IDs of the alignment and the MSA have to partly match.

Parameters
  • aln: MSA class
  • annotation_path: path to annotation file (gb, bed, gff) or raw string