msaexplorer.export
Export module
This module lets you export data produced with MSA explorer.
Functions:
1""" 2# Export module 3 4This module lets you export data produced with MSA explorer. 5 6## Functions: 7""" 8 9import numpy as np 10from numpy import ndarray 11from msaexplorer import config 12from msaexplorer._data_classes import AlignmentStats, OrfCollection, VariantCollection 13from msaexplorer._helpers import _check_and_create_path 14 15 16def snps(snp_data: VariantCollection, format_type: str = 'vcf', path: str | None = None) -> str | None: 17 """ 18 Export SNP data from a VariantCollection to VCF or tabular format. 19 20 :param snp_data: VariantCollection containing SNP positions and variant information. 21 :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'. 22 :param path: Path to output VCF or tabular format. (optional) 23 :return: A string containing the SNP data in the requested format. 24 :raises ValueError: If the input type is invalid or format_type is invalid. 25 """ 26 27 def _validate(): 28 if not isinstance(snp_data, VariantCollection): 29 raise ValueError('Input SNP data must be a VariantCollection dataclass.') 30 if format_type not in ['vcf', 'tabular']: 31 raise ValueError('Invalid format_type.') 32 _check_and_create_path(path) 33 34 def _vcf_format(data: VariantCollection) -> list[str]: 35 """Produce VCF formatted SNP data.""" 36 output_lines = [ 37 '##fileformat=VCFv4.2', 38 '##source=MSAexplorer', 39 '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO', 40 ] 41 42 for pos in sorted(data.positions.keys()): 43 pos_info = data.positions[pos] 44 alt_dict = pos_info.alt 45 alt_alleles = ','.join(alt_dict.keys()) if alt_dict else '.' 46 47 afs = [str(af) for af, _seq_ids in alt_dict.values()] 48 seq_ids = ['|'.join(seq_ids) for _af, seq_ids in alt_dict.values()] 49 info_fields = [] 50 if afs: 51 info_fields.append('AF=' + ','.join(afs)) 52 if seq_ids: 53 info_fields.append('SEQ_ID=' + ','.join(seq_ids)) 54 info = ';'.join(info_fields) if info_fields else '.' 55 56 output_lines.append( 57 f'{data.chrom}\t{pos + 1}\t.\t{pos_info.ref}\t{alt_alleles}\t.\t.\t{info}' 58 ) 59 60 return output_lines 61 62 def _tabular_format(data: VariantCollection) -> list[str]: 63 """Produce tabular formatted SNP data.""" 64 output_lines = ['CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID'] 65 66 for pos in sorted(data.positions.keys()): 67 pos_info = data.positions[pos] 68 for alt, (af, seq_ids) in pos_info.alt.items(): 69 output_lines.append( 70 f'{data.chrom}\t{pos + 1}\t{pos_info.ref}\t{alt}\t{af}\t{",".join(seq_ids)}' 71 ) 72 73 return output_lines 74 75 _validate() 76 lines = _vcf_format(snp_data) if format_type == 'vcf' else _tabular_format(snp_data) 77 78 if path is not None: 79 out_path = f'{path}.{format_type}' 80 with open(out_path, 'w') as out_file: 81 out_file.write('\n'.join(lines)) 82 return None 83 84 return '\n'.join(lines) 85 86 87def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None: 88 """ 89 Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. 90 The alignment format must have headers as keys and the corresponding sequence as values. 91 :param sequence: sequence to export 92 :param header: optional header file 93 :param path: path to save the file 94 :return: fasta formatted string 95 """ 96 def _validate_sequence(seq: str): 97 if not set(seq).issubset(set(config.POSSIBLE_CHARS)): 98 raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}') 99 100 _check_and_create_path(path) 101 fasta_formated_sequence = '' 102 103 if type(sequence) is str: 104 _validate_sequence(sequence) 105 fasta_formated_sequence = f'>{header}\n{sequence}' 106 elif type(sequence) is dict: 107 for header, sequence in sequence.items(): 108 if type(sequence) is not str: 109 raise ValueError(f'Sequences in the dictionary must be strings.') 110 _validate_sequence(sequence) 111 fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}' 112 113 if path is not None: 114 with open(path, 'w') as out_file: 115 out_file.write(fasta_formated_sequence) 116 else: 117 return fasta_formated_sequence 118 119 120def stats(stat_data: AlignmentStats | list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None: 121 """ 122 Export a list of stats per nucleotide to tabular or csv format. 123 124 :param stat_data: position statistic dataclass or list/array of values 125 :param seperator: seperator for values and index 126 :param path: path to save the file 127 :return: tabular/csv formatted string 128 """ 129 # ini 130 _check_and_create_path(path) 131 132 lines = [f'position{seperator}value'] 133 134 if isinstance(stat_data, AlignmentStats): 135 positions = stat_data.positions 136 values = stat_data.values 137 else: 138 values = stat_data 139 positions = np.arange(len(values), dtype=int) 140 141 for position, stat_val in zip(positions, values): 142 lines.append(f'{position}{seperator}{stat_val}') 143 144 if path is not None: 145 with open(path, 'w') as out_file: 146 out_file.write('\n'.join(lines)) 147 else: 148 return '\n'.join(lines) 149 150 151def orf(orfs: OrfCollection, chrom: str, path: str | None = None) -> str | ValueError | None: 152 """ 153 Exports the ORF collection to a .bed file. 154 155 :param orf_dict: OrfContainer instance 156 :param chrom: CHROM identifier for bed format. 157 :param path: Path to the output .bed file. 158 """ 159 if not isinstance(orfs, OrfCollection): 160 raise ValueError('The ORF collection must be an instance of msaexplorer._data_classes.OrfCollection.') 161 162 if not orfs: 163 raise ValueError('The ORF collection is empty.') 164 165 _check_and_create_path(path) 166 167 lines = [] 168 169 for orf_id, orf_data in orfs.items(): 170 loc = orf_data.location[0] 171 conservation = orf_data.conservation 172 strand = orf_data.strand 173 lines.append(f"{chrom}\t{loc[0]}\t{loc[1]}\t{orf_id}\t{conservation:.2f}\t{strand}") 174 175 if path is not None: 176 with open(path, 'w') as out_file: 177 out_file.write('\n'.join(lines)) 178 return None 179 else: 180 return '\n'.join(lines) 181 182 183def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 184 """ 185 Export a character frequency dictionary to tabular or csv format. 186 187 :param char_dict: Dictionary containing the character frequencies. 188 :param seperator: seperator for the table e.g. tabular or comma 189 :param path: Path to output table. 190 191 :return: A string containing the character frequency table. 192 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 193 """ 194 195 def _validate(): 196 if not isinstance(char_dict, dict): 197 raise ValueError('Data must be a dictionary.') 198 for key, value in char_dict.items(): 199 for key_2, value_2 in value.items(): 200 if key_2 not in config.POSSIBLE_CHARS: 201 raise ValueError(f'The key {key_2} is invalid.') 202 for key_3, value_3 in value_2.items(): 203 if key_3 not in ['counts', '% of alignment', '% of non-gapped']: 204 raise ValueError(f'The key "{key_3}" is invalid.') 205 206 # validate input 207 _validate() 208 209 lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped'] 210 for key, value in char_dict.items(): 211 if key == 'total': 212 continue 213 for key_2, value_2 in value.items(): 214 if key_2 == '-': 215 continue 216 lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}') 217 218 # export data 219 if path is not None: 220 with open(path, 'w') as out_file: 221 out_file.write('\n'.join(lines)) 222 else: 223 return '\n'.join(lines) 224 225 226def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 227 """ 228 Export percent_recovery dictionary to tabular or csv format. 229 230 :param rec_dict: Dictionary containing the character frequencies. 231 :param seperator: seperator for the table e.g. tabular or comma 232 :param path: Path to output table. 233 234 :return: A string containing the character frequency table. 235 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 236 """ 237 def _validate(): 238 if not isinstance(rec_dict, dict): 239 raise ValueError('Data must be a dictionary.') 240 for key, value in rec_dict.items(): 241 if type(key) != str: 242 raise ValueError(f'The key {key} is invalid.') 243 elif type(value) != float: 244 raise ValueError(f'The value {value} is invalid.') 245 246 # validate input 247 _validate() 248 249 lines = [F'sequence{seperator}% recovery'] 250 for key, value in rec_dict.items(): 251 lines.append( 252 f'{key}{seperator}{value}') 253 254 # export data 255 if path is not None: 256 with open(path, 'w') as out_file: 257 out_file.write('\n'.join(lines)) 258 else: 259 return '\n'.join(lines)
17def snps(snp_data: VariantCollection, format_type: str = 'vcf', path: str | None = None) -> str | None: 18 """ 19 Export SNP data from a VariantCollection to VCF or tabular format. 20 21 :param snp_data: VariantCollection containing SNP positions and variant information. 22 :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'. 23 :param path: Path to output VCF or tabular format. (optional) 24 :return: A string containing the SNP data in the requested format. 25 :raises ValueError: If the input type is invalid or format_type is invalid. 26 """ 27 28 def _validate(): 29 if not isinstance(snp_data, VariantCollection): 30 raise ValueError('Input SNP data must be a VariantCollection dataclass.') 31 if format_type not in ['vcf', 'tabular']: 32 raise ValueError('Invalid format_type.') 33 _check_and_create_path(path) 34 35 def _vcf_format(data: VariantCollection) -> list[str]: 36 """Produce VCF formatted SNP data.""" 37 output_lines = [ 38 '##fileformat=VCFv4.2', 39 '##source=MSAexplorer', 40 '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO', 41 ] 42 43 for pos in sorted(data.positions.keys()): 44 pos_info = data.positions[pos] 45 alt_dict = pos_info.alt 46 alt_alleles = ','.join(alt_dict.keys()) if alt_dict else '.' 47 48 afs = [str(af) for af, _seq_ids in alt_dict.values()] 49 seq_ids = ['|'.join(seq_ids) for _af, seq_ids in alt_dict.values()] 50 info_fields = [] 51 if afs: 52 info_fields.append('AF=' + ','.join(afs)) 53 if seq_ids: 54 info_fields.append('SEQ_ID=' + ','.join(seq_ids)) 55 info = ';'.join(info_fields) if info_fields else '.' 56 57 output_lines.append( 58 f'{data.chrom}\t{pos + 1}\t.\t{pos_info.ref}\t{alt_alleles}\t.\t.\t{info}' 59 ) 60 61 return output_lines 62 63 def _tabular_format(data: VariantCollection) -> list[str]: 64 """Produce tabular formatted SNP data.""" 65 output_lines = ['CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID'] 66 67 for pos in sorted(data.positions.keys()): 68 pos_info = data.positions[pos] 69 for alt, (af, seq_ids) in pos_info.alt.items(): 70 output_lines.append( 71 f'{data.chrom}\t{pos + 1}\t{pos_info.ref}\t{alt}\t{af}\t{",".join(seq_ids)}' 72 ) 73 74 return output_lines 75 76 _validate() 77 lines = _vcf_format(snp_data) if format_type == 'vcf' else _tabular_format(snp_data) 78 79 if path is not None: 80 out_path = f'{path}.{format_type}' 81 with open(out_path, 'w') as out_file: 82 out_file.write('\n'.join(lines)) 83 return None 84 85 return '\n'.join(lines)
Export SNP data from a VariantCollection to VCF or tabular format.
Parameters
- snp_data: VariantCollection containing SNP positions and variant information.
- format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'.
- path: Path to output VCF or tabular format. (optional)
Returns
A string containing the SNP data in the requested format.
Raises
- ValueError: If the input type is invalid or format_type is invalid.
88def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None: 89 """ 90 Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. 91 The alignment format must have headers as keys and the corresponding sequence as values. 92 :param sequence: sequence to export 93 :param header: optional header file 94 :param path: path to save the file 95 :return: fasta formatted string 96 """ 97 def _validate_sequence(seq: str): 98 if not set(seq).issubset(set(config.POSSIBLE_CHARS)): 99 raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}') 100 101 _check_and_create_path(path) 102 fasta_formated_sequence = '' 103 104 if type(sequence) is str: 105 _validate_sequence(sequence) 106 fasta_formated_sequence = f'>{header}\n{sequence}' 107 elif type(sequence) is dict: 108 for header, sequence in sequence.items(): 109 if type(sequence) is not str: 110 raise ValueError(f'Sequences in the dictionary must be strings.') 111 _validate_sequence(sequence) 112 fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}' 113 114 if path is not None: 115 with open(path, 'w') as out_file: 116 out_file.write(fasta_formated_sequence) 117 else: 118 return fasta_formated_sequence
Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. The alignment format must have headers as keys and the corresponding sequence as values.
Parameters
- sequence: sequence to export
- header: optional header file
- path: path to save the file
Returns
fasta formatted string
121def stats(stat_data: AlignmentStats | list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None: 122 """ 123 Export a list of stats per nucleotide to tabular or csv format. 124 125 :param stat_data: position statistic dataclass or list/array of values 126 :param seperator: seperator for values and index 127 :param path: path to save the file 128 :return: tabular/csv formatted string 129 """ 130 # ini 131 _check_and_create_path(path) 132 133 lines = [f'position{seperator}value'] 134 135 if isinstance(stat_data, AlignmentStats): 136 positions = stat_data.positions 137 values = stat_data.values 138 else: 139 values = stat_data 140 positions = np.arange(len(values), dtype=int) 141 142 for position, stat_val in zip(positions, values): 143 lines.append(f'{position}{seperator}{stat_val}') 144 145 if path is not None: 146 with open(path, 'w') as out_file: 147 out_file.write('\n'.join(lines)) 148 else: 149 return '\n'.join(lines)
Export a list of stats per nucleotide to tabular or csv format.
Parameters
- stat_data: position statistic dataclass or list/array of values
- seperator: seperator for values and index
- path: path to save the file
Returns
tabular/csv formatted string
152def orf(orfs: OrfCollection, chrom: str, path: str | None = None) -> str | ValueError | None: 153 """ 154 Exports the ORF collection to a .bed file. 155 156 :param orf_dict: OrfContainer instance 157 :param chrom: CHROM identifier for bed format. 158 :param path: Path to the output .bed file. 159 """ 160 if not isinstance(orfs, OrfCollection): 161 raise ValueError('The ORF collection must be an instance of msaexplorer._data_classes.OrfCollection.') 162 163 if not orfs: 164 raise ValueError('The ORF collection is empty.') 165 166 _check_and_create_path(path) 167 168 lines = [] 169 170 for orf_id, orf_data in orfs.items(): 171 loc = orf_data.location[0] 172 conservation = orf_data.conservation 173 strand = orf_data.strand 174 lines.append(f"{chrom}\t{loc[0]}\t{loc[1]}\t{orf_id}\t{conservation:.2f}\t{strand}") 175 176 if path is not None: 177 with open(path, 'w') as out_file: 178 out_file.write('\n'.join(lines)) 179 return None 180 else: 181 return '\n'.join(lines)
Exports the ORF collection to a .bed file.
Parameters
- orf_dict: OrfContainer instance
- chrom: CHROM identifier for bed format.
- path: Path to the output .bed file.
184def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 185 """ 186 Export a character frequency dictionary to tabular or csv format. 187 188 :param char_dict: Dictionary containing the character frequencies. 189 :param seperator: seperator for the table e.g. tabular or comma 190 :param path: Path to output table. 191 192 :return: A string containing the character frequency table. 193 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 194 """ 195 196 def _validate(): 197 if not isinstance(char_dict, dict): 198 raise ValueError('Data must be a dictionary.') 199 for key, value in char_dict.items(): 200 for key_2, value_2 in value.items(): 201 if key_2 not in config.POSSIBLE_CHARS: 202 raise ValueError(f'The key {key_2} is invalid.') 203 for key_3, value_3 in value_2.items(): 204 if key_3 not in ['counts', '% of alignment', '% of non-gapped']: 205 raise ValueError(f'The key "{key_3}" is invalid.') 206 207 # validate input 208 _validate() 209 210 lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped'] 211 for key, value in char_dict.items(): 212 if key == 'total': 213 continue 214 for key_2, value_2 in value.items(): 215 if key_2 == '-': 216 continue 217 lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}') 218 219 # export data 220 if path is not None: 221 with open(path, 'w') as out_file: 222 out_file.write('\n'.join(lines)) 223 else: 224 return '\n'.join(lines)
Export a character frequency dictionary to tabular or csv format.
Parameters
- char_dict: Dictionary containing the character frequencies.
- seperator: seperator for the table e.g. tabular or comma
- path: Path to output table.
Returns
A string containing the character frequency table.
Raises
- ValueError: if the input dictionary is missing required keys or format_type is invalid.
227def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 228 """ 229 Export percent_recovery dictionary to tabular or csv format. 230 231 :param rec_dict: Dictionary containing the character frequencies. 232 :param seperator: seperator for the table e.g. tabular or comma 233 :param path: Path to output table. 234 235 :return: A string containing the character frequency table. 236 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 237 """ 238 def _validate(): 239 if not isinstance(rec_dict, dict): 240 raise ValueError('Data must be a dictionary.') 241 for key, value in rec_dict.items(): 242 if type(key) != str: 243 raise ValueError(f'The key {key} is invalid.') 244 elif type(value) != float: 245 raise ValueError(f'The value {value} is invalid.') 246 247 # validate input 248 _validate() 249 250 lines = [F'sequence{seperator}% recovery'] 251 for key, value in rec_dict.items(): 252 lines.append( 253 f'{key}{seperator}{value}') 254 255 # export data 256 if path is not None: 257 with open(path, 'w') as out_file: 258 out_file.write('\n'.join(lines)) 259 else: 260 return '\n'.join(lines)
Export percent_recovery dictionary to tabular or csv format.
Parameters
- rec_dict: Dictionary containing the character frequencies.
- seperator: seperator for the table e.g. tabular or comma
- path: Path to output table.
Returns
A string containing the character frequency table.
Raises
- ValueError: if the input dictionary is missing required keys or format_type is invalid.