msaexplorer.export

Export module

This module lets you export data produced with MSA explorer.

Functions:

  1"""
  2# Export module
  3
  4This module lets you export data produced with MSA explorer.
  5
  6## Functions:
  7"""
  8
  9import os
 10from numpy import ndarray
 11from msaexplorer import config
 12
 13
 14def _check_and_create_path(path: str):
 15    """
 16    Check and create path if it doesn't exist.
 17    :param path: string to file
 18    """
 19    if path is not None:
 20        output_dir = os.path.dirname(path)
 21        if output_dir and not os.path.exists(output_dir):
 22            os.makedirs(output_dir)
 23
 24
 25def snps(snp_dict: dict, format_type: str = 'vcf', path: str | None = None) -> str | None | ValueError:
 26    """
 27    Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard
 28    format that MSAexplorer produces.
 29
 30    :param snp_dict: Dictionary containing SNP positions and variant information.
 31    :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'.
 32    :param path: Path to output VCF or tabular format. (optional)
 33    :return: A string containing the SNP data in the requested format.
 34    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
 35    """
 36
 37    def _validate():
 38        if not isinstance(snp_dict, dict):
 39            raise ValueError('Input SNP data must be a dictionary.')
 40        for key in ['#CHROM', 'POS']:
 41            if key not in snp_dict:
 42                raise ValueError(f"Missing required key '{key}' in SNP data.")
 43        if not isinstance(snp_dict['POS'], dict):
 44            raise ValueError('Expected the \'POS\' key to contain a dictionary of positions.')
 45        if format_type not in ['vcf', 'tabular']:
 46            raise ValueError('Invalid format_type.')
 47        _check_and_create_path(path)
 48
 49    def _vcf_format(snp_dict: dict) -> list:
 50        """
 51        Produce  vcf formatted SNP data.
 52        :param snp_dict: dictionary containing SNP positions and variant information.
 53        :return: list of lines to write
 54        """
 55        output_lines = []
 56        # VCF header
 57        output_lines.append('##fileformat=VCFv4.2')
 58        output_lines.append('##source=MSAexplorer')
 59        output_lines.append('#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO')
 60        # process each SNP position in sorted order
 61        for pos in sorted(snp_dict['POS'].keys()):
 62            pos_info = snp_dict['POS'][pos]
 63            ref = pos_info.get('ref', '.')
 64            alt_dict = pos_info.get('ALT', {})
 65            # Create comma-separated list of alternative alleles
 66            alt_alleles = ",".join(alt_dict.keys()) if alt_dict else "."
 67            # Prepare INFO field: include allele frequencies and sequence IDs
 68            afs = []
 69            seq_ids = []
 70            for alt, details in alt_dict.items():
 71                af = details.get('AF', 0)
 72                afs.append(str(af))
 73                seq_ids.append("|".join(details.get('SEQ_ID', [])))
 74            info_fields = []
 75            if afs:
 76                info_fields.append("AF=" + ",".join(afs))
 77            if seq_ids:
 78                info_fields.append("SEQ_ID=" + ",".join(seq_ids))
 79            info = ";".join(info_fields) if info_fields else "."
 80
 81            # VCF is 1-indexed; we assume pos is 0-indexed and add 1
 82            line = f"{snp_dict['#CHROM']}\t{pos + 1}\t.\t{ref}\t{alt_alleles}\t.\t.\t{info}"
 83            output_lines.append(line)
 84
 85        return output_lines
 86
 87    def _tabular_format(snp_dict: dict) -> list:
 88        """
 89        Produce  tabular formatted SNP data.
 90
 91        :param snp_dict: dictionary containing SNP positions and variant information.
 92        :return: list of lines to write
 93        """
 94        output_lines = []
 95        # Create a header for the tabular output
 96        output_lines.append('CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID')
 97
 98        # Process each SNP position and each alternative allele
 99        for pos in sorted(snp_dict['POS'].keys()):
100            pos_info = snp_dict['POS'][pos]
101            ref = pos_info.get('ref', '.')
102            alt_dict = pos_info.get('ALT', {})
103            for alt, details in alt_dict.items():
104                af = details.get('AF', 0)
105                seq_id = ",".join(details.get('SEQ_ID', []))
106                output_lines.append(f"{snp_dict['#CHROM']}\t{pos + 1}\t{ref}\t{alt}\t{af}\t{seq_id}")
107
108        return output_lines
109
110    # validate correct input format
111    _validate()
112
113    # generate line data
114    if format_type == 'vcf':
115        lines = _vcf_format(snp_dict)
116    else:
117        lines = _tabular_format(snp_dict)
118
119    # export to file or return plain text
120    if path is not None:
121        out_path = f"{path}.{format_type}"
122        with open(out_path, 'w') as out_file:
123            out_file.write('\n'.join(lines))
124    else:
125        return '\n'.join(lines)
126
127
128def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None:
129    """
130    Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file.
131    The alignment format must have headers as keys and the corresponding sequence as values.
132    :param sequence: sequence to export
133    :param header: optional header file
134    :param path: path to save the file
135    :return: fasta formatted string
136    """
137    def _validate_sequence(seq: str):
138        if not set(seq).issubset(set(config.POSSIBLE_CHARS)):
139            raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}')
140
141    _check_and_create_path(path)
142    fasta_formated_sequence = ''
143
144    if type(sequence) is str:
145        _validate_sequence(sequence)
146        fasta_formated_sequence = f'>{header}\n{sequence}'
147    elif type(sequence) is dict:
148        for header, sequence in sequence.items():
149            if type(sequence) is not str:
150                raise ValueError(f'Sequences in the dictionary must be strings.')
151            _validate_sequence(sequence)
152            fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}'
153
154    if path is not None:
155        with open(path, 'w') as out_file:
156            out_file.write(fasta_formated_sequence)
157    else:
158        return fasta_formated_sequence
159
160
161def stats(stat_data: list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None:
162    """
163    Export a list of stats per nucleotide to tabular or csv format.
164
165    :param stat_data: list of stat values
166    :param seperator: seperator for values and index
167    :param path: path to save the file
168    :return: tabular/csv formatted string
169    """
170    # ini
171    _check_and_create_path(path)
172
173    lines = [f'position{seperator}value']
174
175    for idx, stat_val in enumerate(stat_data):
176        lines.append(f'{idx}{seperator}{stat_val}')
177
178    if path is not None:
179        with open(path, 'w') as out_file:
180            out_file.write('\n'.join(lines))
181    else:
182        return '\n'.join(lines)
183
184
185def orf(orf_dict: dict, chrom: str, path: str | None = None) -> str | ValueError:
186    """
187    Exports the ORF dictionary to a .bed file.
188
189    :param orf_dict: Dictionary containing ORF information.
190    :param path: Path to the output .bed file.
191    :param : Reference name
192    """
193    if not orf_dict:
194        raise ValueError("The ORF dictionary is empty. Nothing to export.")
195    else:
196        if list(orf_dict[list(orf_dict.keys())[0]].keys()) != ['location', 'frame', 'strand', 'conservation', 'internal']:
197            raise ValueError("The ORF dictionary has not the right format.")
198
199    _check_and_create_path(path)
200
201    lines = []
202
203    for orf_id, orf_data in orf_dict.items():
204        lines.append(
205            f"{chrom}\t{orf_data['location'][0][0]}\t{orf_data['location'][0][1]}\t{orf_id}\t{orf_data['conservation']:.2f}\t{orf_data['strand']}"
206        )
207
208    if path is not None:
209        with open(path, 'w') as out_file:
210            out_file.write('\n'.join(lines))
211    else:
212        return '\n'.join(lines)
213
214
215def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
216    """
217    Export a character frequency dictionary to tabular or csv format.
218
219    :param char_dict: Dictionary containing the character frequencies.
220    :param seperator: seperator for the table e.g. tabular or comma
221    :param path: Path to output table.
222
223    :return: A string containing the character frequency table.
224    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
225    """
226
227    def _validate():
228        if not isinstance(char_dict, dict):
229            raise ValueError('Data must be a dictionary.')
230        for key, value in char_dict.items():
231            for key_2, value_2 in value.items():
232                if key_2 not in config.POSSIBLE_CHARS:
233                    raise ValueError(f'The key {key_2} is invalid.')
234                for key_3, value_3 in value_2.items():
235                    if key_3 not in ['counts', '% of alignment', '% of non-gapped']:
236                        raise ValueError(f'The key "{key_3}" is invalid.')
237
238    # validate input
239    _validate()
240
241    lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped']
242    for key, value in char_dict.items():
243        if key == 'total':
244            continue
245        for key_2, value_2 in value.items():
246            if key_2 == '-':
247                continue
248            lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}')
249
250    # export data
251    if path is not None:
252        with open(path, 'w') as out_file:
253            out_file.write('\n'.join(lines))
254    else:
255        return '\n'.join(lines)
256
257
258def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
259    """
260    Export percent_recovery dictionary to tabular or csv format.
261
262    :param rec_dict: Dictionary containing the character frequencies.
263    :param seperator: seperator for the table e.g. tabular or comma
264    :param path: Path to output table.
265
266    :return: A string containing the character frequency table.
267    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
268    """
269    def _validate():
270        if not isinstance(rec_dict, dict):
271            raise ValueError('Data must be a dictionary.')
272        for key, value in rec_dict.items():
273            if type(key) != str:
274                raise ValueError(f'The key {key} is invalid.')
275            elif type(value) != float:
276                raise ValueError(f'The value {value} is invalid.')
277
278    # validate input
279    _validate()
280
281    lines = [F'sequence{seperator}% recovery']
282    for key, value in rec_dict.items():
283        lines.append(
284            f'{key}{seperator}{value}')
285
286    # export data
287    if path is not None:
288        with open(path, 'w') as out_file:
289            out_file.write('\n'.join(lines))
290    else:
291        return '\n'.join(lines)
def snps( snp_dict: dict, format_type: str = 'vcf', path: str | None = None) -> str | None | ValueError:
 26def snps(snp_dict: dict, format_type: str = 'vcf', path: str | None = None) -> str | None | ValueError:
 27    """
 28    Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard
 29    format that MSAexplorer produces.
 30
 31    :param snp_dict: Dictionary containing SNP positions and variant information.
 32    :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'.
 33    :param path: Path to output VCF or tabular format. (optional)
 34    :return: A string containing the SNP data in the requested format.
 35    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
 36    """
 37
 38    def _validate():
 39        if not isinstance(snp_dict, dict):
 40            raise ValueError('Input SNP data must be a dictionary.')
 41        for key in ['#CHROM', 'POS']:
 42            if key not in snp_dict:
 43                raise ValueError(f"Missing required key '{key}' in SNP data.")
 44        if not isinstance(snp_dict['POS'], dict):
 45            raise ValueError('Expected the \'POS\' key to contain a dictionary of positions.')
 46        if format_type not in ['vcf', 'tabular']:
 47            raise ValueError('Invalid format_type.')
 48        _check_and_create_path(path)
 49
 50    def _vcf_format(snp_dict: dict) -> list:
 51        """
 52        Produce  vcf formatted SNP data.
 53        :param snp_dict: dictionary containing SNP positions and variant information.
 54        :return: list of lines to write
 55        """
 56        output_lines = []
 57        # VCF header
 58        output_lines.append('##fileformat=VCFv4.2')
 59        output_lines.append('##source=MSAexplorer')
 60        output_lines.append('#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO')
 61        # process each SNP position in sorted order
 62        for pos in sorted(snp_dict['POS'].keys()):
 63            pos_info = snp_dict['POS'][pos]
 64            ref = pos_info.get('ref', '.')
 65            alt_dict = pos_info.get('ALT', {})
 66            # Create comma-separated list of alternative alleles
 67            alt_alleles = ",".join(alt_dict.keys()) if alt_dict else "."
 68            # Prepare INFO field: include allele frequencies and sequence IDs
 69            afs = []
 70            seq_ids = []
 71            for alt, details in alt_dict.items():
 72                af = details.get('AF', 0)
 73                afs.append(str(af))
 74                seq_ids.append("|".join(details.get('SEQ_ID', [])))
 75            info_fields = []
 76            if afs:
 77                info_fields.append("AF=" + ",".join(afs))
 78            if seq_ids:
 79                info_fields.append("SEQ_ID=" + ",".join(seq_ids))
 80            info = ";".join(info_fields) if info_fields else "."
 81
 82            # VCF is 1-indexed; we assume pos is 0-indexed and add 1
 83            line = f"{snp_dict['#CHROM']}\t{pos + 1}\t.\t{ref}\t{alt_alleles}\t.\t.\t{info}"
 84            output_lines.append(line)
 85
 86        return output_lines
 87
 88    def _tabular_format(snp_dict: dict) -> list:
 89        """
 90        Produce  tabular formatted SNP data.
 91
 92        :param snp_dict: dictionary containing SNP positions and variant information.
 93        :return: list of lines to write
 94        """
 95        output_lines = []
 96        # Create a header for the tabular output
 97        output_lines.append('CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID')
 98
 99        # Process each SNP position and each alternative allele
100        for pos in sorted(snp_dict['POS'].keys()):
101            pos_info = snp_dict['POS'][pos]
102            ref = pos_info.get('ref', '.')
103            alt_dict = pos_info.get('ALT', {})
104            for alt, details in alt_dict.items():
105                af = details.get('AF', 0)
106                seq_id = ",".join(details.get('SEQ_ID', []))
107                output_lines.append(f"{snp_dict['#CHROM']}\t{pos + 1}\t{ref}\t{alt}\t{af}\t{seq_id}")
108
109        return output_lines
110
111    # validate correct input format
112    _validate()
113
114    # generate line data
115    if format_type == 'vcf':
116        lines = _vcf_format(snp_dict)
117    else:
118        lines = _tabular_format(snp_dict)
119
120    # export to file or return plain text
121    if path is not None:
122        out_path = f"{path}.{format_type}"
123        with open(out_path, 'w') as out_file:
124            out_file.write('\n'.join(lines))
125    else:
126        return '\n'.join(lines)

Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard format that MSAexplorer produces.

Parameters
  • snp_dict: Dictionary containing SNP positions and variant information.
  • format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'.
  • path: Path to output VCF or tabular format. (optional)
Returns

A string containing the SNP data in the requested format.

Raises
  • ValueError: if the input dictionary is missing required keys or format_type is invalid.
def fasta( sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None:
129def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None:
130    """
131    Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file.
132    The alignment format must have headers as keys and the corresponding sequence as values.
133    :param sequence: sequence to export
134    :param header: optional header file
135    :param path: path to save the file
136    :return: fasta formatted string
137    """
138    def _validate_sequence(seq: str):
139        if not set(seq).issubset(set(config.POSSIBLE_CHARS)):
140            raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}')
141
142    _check_and_create_path(path)
143    fasta_formated_sequence = ''
144
145    if type(sequence) is str:
146        _validate_sequence(sequence)
147        fasta_formated_sequence = f'>{header}\n{sequence}'
148    elif type(sequence) is dict:
149        for header, sequence in sequence.items():
150            if type(sequence) is not str:
151                raise ValueError(f'Sequences in the dictionary must be strings.')
152            _validate_sequence(sequence)
153            fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}'
154
155    if path is not None:
156        with open(path, 'w') as out_file:
157            out_file.write(fasta_formated_sequence)
158    else:
159        return fasta_formated_sequence

Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. The alignment format must have headers as keys and the corresponding sequence as values.

Parameters
  • sequence: sequence to export
  • header: optional header file
  • path: path to save the file
Returns

fasta formatted string

def stats( stat_data: list | numpy.ndarray, seperator: str = '\t', path: str | None = None) -> str | None:
162def stats(stat_data: list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None:
163    """
164    Export a list of stats per nucleotide to tabular or csv format.
165
166    :param stat_data: list of stat values
167    :param seperator: seperator for values and index
168    :param path: path to save the file
169    :return: tabular/csv formatted string
170    """
171    # ini
172    _check_and_create_path(path)
173
174    lines = [f'position{seperator}value']
175
176    for idx, stat_val in enumerate(stat_data):
177        lines.append(f'{idx}{seperator}{stat_val}')
178
179    if path is not None:
180        with open(path, 'w') as out_file:
181            out_file.write('\n'.join(lines))
182    else:
183        return '\n'.join(lines)

Export a list of stats per nucleotide to tabular or csv format.

Parameters
  • stat_data: list of stat values
  • seperator: seperator for values and index
  • path: path to save the file
Returns

tabular/csv formatted string

def orf(orf_dict: dict, chrom: str, path: str | None = None) -> str | ValueError:
186def orf(orf_dict: dict, chrom: str, path: str | None = None) -> str | ValueError:
187    """
188    Exports the ORF dictionary to a .bed file.
189
190    :param orf_dict: Dictionary containing ORF information.
191    :param path: Path to the output .bed file.
192    :param : Reference name
193    """
194    if not orf_dict:
195        raise ValueError("The ORF dictionary is empty. Nothing to export.")
196    else:
197        if list(orf_dict[list(orf_dict.keys())[0]].keys()) != ['location', 'frame', 'strand', 'conservation', 'internal']:
198            raise ValueError("The ORF dictionary has not the right format.")
199
200    _check_and_create_path(path)
201
202    lines = []
203
204    for orf_id, orf_data in orf_dict.items():
205        lines.append(
206            f"{chrom}\t{orf_data['location'][0][0]}\t{orf_data['location'][0][1]}\t{orf_id}\t{orf_data['conservation']:.2f}\t{orf_data['strand']}"
207        )
208
209    if path is not None:
210        with open(path, 'w') as out_file:
211            out_file.write('\n'.join(lines))
212    else:
213        return '\n'.join(lines)

Exports the ORF dictionary to a .bed file.

Parameters
  • orf_dict: Dictionary containing ORF information.
  • path: Path to the output .bed file. :param : Reference name
def character_freq( char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
216def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
217    """
218    Export a character frequency dictionary to tabular or csv format.
219
220    :param char_dict: Dictionary containing the character frequencies.
221    :param seperator: seperator for the table e.g. tabular or comma
222    :param path: Path to output table.
223
224    :return: A string containing the character frequency table.
225    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
226    """
227
228    def _validate():
229        if not isinstance(char_dict, dict):
230            raise ValueError('Data must be a dictionary.')
231        for key, value in char_dict.items():
232            for key_2, value_2 in value.items():
233                if key_2 not in config.POSSIBLE_CHARS:
234                    raise ValueError(f'The key {key_2} is invalid.')
235                for key_3, value_3 in value_2.items():
236                    if key_3 not in ['counts', '% of alignment', '% of non-gapped']:
237                        raise ValueError(f'The key "{key_3}" is invalid.')
238
239    # validate input
240    _validate()
241
242    lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped']
243    for key, value in char_dict.items():
244        if key == 'total':
245            continue
246        for key_2, value_2 in value.items():
247            if key_2 == '-':
248                continue
249            lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}')
250
251    # export data
252    if path is not None:
253        with open(path, 'w') as out_file:
254            out_file.write('\n'.join(lines))
255    else:
256        return '\n'.join(lines)

Export a character frequency dictionary to tabular or csv format.

Parameters
  • char_dict: Dictionary containing the character frequencies.
  • seperator: seperator for the table e.g. tabular or comma
  • path: Path to output table.
Returns

A string containing the character frequency table.

Raises
  • ValueError: if the input dictionary is missing required keys or format_type is invalid.
def percent_recovery( rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
259def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError:
260    """
261    Export percent_recovery dictionary to tabular or csv format.
262
263    :param rec_dict: Dictionary containing the character frequencies.
264    :param seperator: seperator for the table e.g. tabular or comma
265    :param path: Path to output table.
266
267    :return: A string containing the character frequency table.
268    :raises ValueError: if the input dictionary is missing required keys or format_type is invalid.
269    """
270    def _validate():
271        if not isinstance(rec_dict, dict):
272            raise ValueError('Data must be a dictionary.')
273        for key, value in rec_dict.items():
274            if type(key) != str:
275                raise ValueError(f'The key {key} is invalid.')
276            elif type(value) != float:
277                raise ValueError(f'The value {value} is invalid.')
278
279    # validate input
280    _validate()
281
282    lines = [F'sequence{seperator}% recovery']
283    for key, value in rec_dict.items():
284        lines.append(
285            f'{key}{seperator}{value}')
286
287    # export data
288    if path is not None:
289        with open(path, 'w') as out_file:
290            out_file.write('\n'.join(lines))
291    else:
292        return '\n'.join(lines)

Export percent_recovery dictionary to tabular or csv format.

Parameters
  • rec_dict: Dictionary containing the character frequencies.
  • seperator: seperator for the table e.g. tabular or comma
  • path: Path to output table.
Returns

A string containing the character frequency table.

Raises
  • ValueError: if the input dictionary is missing required keys or format_type is invalid.