msaexplorer.export
Export module
This module lets you export data produced with MSA explorer.
Functions:
1""" 2# Export module 3 4This module lets you export data produced with MSA explorer. 5 6## Functions: 7""" 8 9import os 10from numpy import ndarray 11from msaexplorer import config 12 13 14def _check_and_create_path(path: str): 15 """ 16 Check and create path if it doesn't exist. 17 :param path: string to file 18 """ 19 if path is not None: 20 output_dir = os.path.dirname(path) 21 if output_dir and not os.path.exists(output_dir): 22 os.makedirs(output_dir) 23 24 25def snps(snp_dict: dict, format_type: str = 'vcf', path: str | None = None) -> str | None | ValueError: 26 """ 27 Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard 28 format that MSAexplorer produces. 29 30 :param snp_dict: Dictionary containing SNP positions and variant information. 31 :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'. 32 :param path: Path to output VCF or tabular format. (optional) 33 :return: A string containing the SNP data in the requested format. 34 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 35 """ 36 37 def _validate(): 38 if not isinstance(snp_dict, dict): 39 raise ValueError('Input SNP data must be a dictionary.') 40 for key in ['#CHROM', 'POS']: 41 if key not in snp_dict: 42 raise ValueError(f"Missing required key '{key}' in SNP data.") 43 if not isinstance(snp_dict['POS'], dict): 44 raise ValueError('Expected the \'POS\' key to contain a dictionary of positions.') 45 if format_type not in ['vcf', 'tabular']: 46 raise ValueError('Invalid format_type.') 47 _check_and_create_path(path) 48 49 def _vcf_format(snp_dict: dict) -> list: 50 """ 51 Produce vcf formatted SNP data. 52 :param snp_dict: dictionary containing SNP positions and variant information. 53 :return: list of lines to write 54 """ 55 output_lines = [] 56 # VCF header 57 output_lines.append('##fileformat=VCFv4.2') 58 output_lines.append('##source=MSAexplorer') 59 output_lines.append('#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO') 60 # process each SNP position in sorted order 61 for pos in sorted(snp_dict['POS'].keys()): 62 pos_info = snp_dict['POS'][pos] 63 ref = pos_info.get('ref', '.') 64 alt_dict = pos_info.get('ALT', {}) 65 # Create comma-separated list of alternative alleles 66 alt_alleles = ",".join(alt_dict.keys()) if alt_dict else "." 67 # Prepare INFO field: include allele frequencies and sequence IDs 68 afs = [] 69 seq_ids = [] 70 for alt, details in alt_dict.items(): 71 af = details.get('AF', 0) 72 afs.append(str(af)) 73 seq_ids.append("|".join(details.get('SEQ_ID', []))) 74 info_fields = [] 75 if afs: 76 info_fields.append("AF=" + ",".join(afs)) 77 if seq_ids: 78 info_fields.append("SEQ_ID=" + ",".join(seq_ids)) 79 info = ";".join(info_fields) if info_fields else "." 80 81 # VCF is 1-indexed; we assume pos is 0-indexed and add 1 82 line = f"{snp_dict['#CHROM']}\t{pos + 1}\t.\t{ref}\t{alt_alleles}\t.\t.\t{info}" 83 output_lines.append(line) 84 85 return output_lines 86 87 def _tabular_format(snp_dict: dict) -> list: 88 """ 89 Produce tabular formatted SNP data. 90 91 :param snp_dict: dictionary containing SNP positions and variant information. 92 :return: list of lines to write 93 """ 94 output_lines = [] 95 # Create a header for the tabular output 96 output_lines.append('CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID') 97 98 # Process each SNP position and each alternative allele 99 for pos in sorted(snp_dict['POS'].keys()): 100 pos_info = snp_dict['POS'][pos] 101 ref = pos_info.get('ref', '.') 102 alt_dict = pos_info.get('ALT', {}) 103 for alt, details in alt_dict.items(): 104 af = details.get('AF', 0) 105 seq_id = ",".join(details.get('SEQ_ID', [])) 106 output_lines.append(f"{snp_dict['#CHROM']}\t{pos + 1}\t{ref}\t{alt}\t{af}\t{seq_id}") 107 108 return output_lines 109 110 # validate correct input format 111 _validate() 112 113 # generate line data 114 if format_type == 'vcf': 115 lines = _vcf_format(snp_dict) 116 else: 117 lines = _tabular_format(snp_dict) 118 119 # export to file or return plain text 120 if path is not None: 121 out_path = f"{path}.{format_type}" 122 with open(out_path, 'w') as out_file: 123 out_file.write('\n'.join(lines)) 124 else: 125 return '\n'.join(lines) 126 127 128def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None: 129 """ 130 Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. 131 The alignment format must have headers as keys and the corresponding sequence as values. 132 :param sequence: sequence to export 133 :param header: optional header file 134 :param path: path to save the file 135 :return: fasta formatted string 136 """ 137 def _validate_sequence(seq: str): 138 if not set(seq).issubset(set(config.POSSIBLE_CHARS)): 139 raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}') 140 141 _check_and_create_path(path) 142 fasta_formated_sequence = '' 143 144 if type(sequence) is str: 145 _validate_sequence(sequence) 146 fasta_formated_sequence = f'>{header}\n{sequence}' 147 elif type(sequence) is dict: 148 for header, sequence in sequence.items(): 149 if type(sequence) is not str: 150 raise ValueError(f'Sequences in the dictionary must be strings.') 151 _validate_sequence(sequence) 152 fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}' 153 154 if path is not None: 155 with open(path, 'w') as out_file: 156 out_file.write(fasta_formated_sequence) 157 else: 158 return fasta_formated_sequence 159 160 161def stats(stat_data: list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None: 162 """ 163 Export a list of stats per nucleotide to tabular or csv format. 164 165 :param stat_data: list of stat values 166 :param seperator: seperator for values and index 167 :param path: path to save the file 168 :return: tabular/csv formatted string 169 """ 170 # ini 171 _check_and_create_path(path) 172 173 lines = [f'position{seperator}value'] 174 175 for idx, stat_val in enumerate(stat_data): 176 lines.append(f'{idx}{seperator}{stat_val}') 177 178 if path is not None: 179 with open(path, 'w') as out_file: 180 out_file.write('\n'.join(lines)) 181 else: 182 return '\n'.join(lines) 183 184 185def orf(orf_dict: dict, chrom: str, path: str | None = None) -> str | ValueError: 186 """ 187 Exports the ORF dictionary to a .bed file. 188 189 :param orf_dict: Dictionary containing ORF information. 190 :param path: Path to the output .bed file. 191 :param : Reference name 192 """ 193 if not orf_dict: 194 raise ValueError("The ORF dictionary is empty. Nothing to export.") 195 else: 196 if list(orf_dict[list(orf_dict.keys())[0]].keys()) != ['location', 'frame', 'strand', 'conservation', 'internal']: 197 raise ValueError("The ORF dictionary has not the right format.") 198 199 _check_and_create_path(path) 200 201 lines = [] 202 203 for orf_id, orf_data in orf_dict.items(): 204 lines.append( 205 f"{chrom}\t{orf_data['location'][0][0]}\t{orf_data['location'][0][1]}\t{orf_id}\t{orf_data['conservation']:.2f}\t{orf_data['strand']}" 206 ) 207 208 if path is not None: 209 with open(path, 'w') as out_file: 210 out_file.write('\n'.join(lines)) 211 else: 212 return '\n'.join(lines) 213 214 215def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 216 """ 217 Export a character frequency dictionary to tabular or csv format. 218 219 :param char_dict: Dictionary containing the character frequencies. 220 :param seperator: seperator for the table e.g. tabular or comma 221 :param path: Path to output table. 222 223 :return: A string containing the character frequency table. 224 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 225 """ 226 227 def _validate(): 228 if not isinstance(char_dict, dict): 229 raise ValueError('Data must be a dictionary.') 230 for key, value in char_dict.items(): 231 for key_2, value_2 in value.items(): 232 if key_2 not in config.POSSIBLE_CHARS: 233 raise ValueError(f'The key {key_2} is invalid.') 234 for key_3, value_3 in value_2.items(): 235 if key_3 not in ['counts', '% of alignment', '% of non-gapped']: 236 raise ValueError(f'The key "{key_3}" is invalid.') 237 238 # validate input 239 _validate() 240 241 lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped'] 242 for key, value in char_dict.items(): 243 if key == 'total': 244 continue 245 for key_2, value_2 in value.items(): 246 if key_2 == '-': 247 continue 248 lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}') 249 250 # export data 251 if path is not None: 252 with open(path, 'w') as out_file: 253 out_file.write('\n'.join(lines)) 254 else: 255 return '\n'.join(lines) 256 257 258def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 259 """ 260 Export percent_recovery dictionary to tabular or csv format. 261 262 :param rec_dict: Dictionary containing the character frequencies. 263 :param seperator: seperator for the table e.g. tabular or comma 264 :param path: Path to output table. 265 266 :return: A string containing the character frequency table. 267 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 268 """ 269 def _validate(): 270 if not isinstance(rec_dict, dict): 271 raise ValueError('Data must be a dictionary.') 272 for key, value in rec_dict.items(): 273 if type(key) != str: 274 raise ValueError(f'The key {key} is invalid.') 275 elif type(value) != float: 276 raise ValueError(f'The value {value} is invalid.') 277 278 # validate input 279 _validate() 280 281 lines = [F'sequence{seperator}% recovery'] 282 for key, value in rec_dict.items(): 283 lines.append( 284 f'{key}{seperator}{value}') 285 286 # export data 287 if path is not None: 288 with open(path, 'w') as out_file: 289 out_file.write('\n'.join(lines)) 290 else: 291 return '\n'.join(lines)
26def snps(snp_dict: dict, format_type: str = 'vcf', path: str | None = None) -> str | None | ValueError: 27 """ 28 Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard 29 format that MSAexplorer produces. 30 31 :param snp_dict: Dictionary containing SNP positions and variant information. 32 :param format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'. 33 :param path: Path to output VCF or tabular format. (optional) 34 :return: A string containing the SNP data in the requested format. 35 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 36 """ 37 38 def _validate(): 39 if not isinstance(snp_dict, dict): 40 raise ValueError('Input SNP data must be a dictionary.') 41 for key in ['#CHROM', 'POS']: 42 if key not in snp_dict: 43 raise ValueError(f"Missing required key '{key}' in SNP data.") 44 if not isinstance(snp_dict['POS'], dict): 45 raise ValueError('Expected the \'POS\' key to contain a dictionary of positions.') 46 if format_type not in ['vcf', 'tabular']: 47 raise ValueError('Invalid format_type.') 48 _check_and_create_path(path) 49 50 def _vcf_format(snp_dict: dict) -> list: 51 """ 52 Produce vcf formatted SNP data. 53 :param snp_dict: dictionary containing SNP positions and variant information. 54 :return: list of lines to write 55 """ 56 output_lines = [] 57 # VCF header 58 output_lines.append('##fileformat=VCFv4.2') 59 output_lines.append('##source=MSAexplorer') 60 output_lines.append('#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO') 61 # process each SNP position in sorted order 62 for pos in sorted(snp_dict['POS'].keys()): 63 pos_info = snp_dict['POS'][pos] 64 ref = pos_info.get('ref', '.') 65 alt_dict = pos_info.get('ALT', {}) 66 # Create comma-separated list of alternative alleles 67 alt_alleles = ",".join(alt_dict.keys()) if alt_dict else "." 68 # Prepare INFO field: include allele frequencies and sequence IDs 69 afs = [] 70 seq_ids = [] 71 for alt, details in alt_dict.items(): 72 af = details.get('AF', 0) 73 afs.append(str(af)) 74 seq_ids.append("|".join(details.get('SEQ_ID', []))) 75 info_fields = [] 76 if afs: 77 info_fields.append("AF=" + ",".join(afs)) 78 if seq_ids: 79 info_fields.append("SEQ_ID=" + ",".join(seq_ids)) 80 info = ";".join(info_fields) if info_fields else "." 81 82 # VCF is 1-indexed; we assume pos is 0-indexed and add 1 83 line = f"{snp_dict['#CHROM']}\t{pos + 1}\t.\t{ref}\t{alt_alleles}\t.\t.\t{info}" 84 output_lines.append(line) 85 86 return output_lines 87 88 def _tabular_format(snp_dict: dict) -> list: 89 """ 90 Produce tabular formatted SNP data. 91 92 :param snp_dict: dictionary containing SNP positions and variant information. 93 :return: list of lines to write 94 """ 95 output_lines = [] 96 # Create a header for the tabular output 97 output_lines.append('CHROM\tPOS\tREF\tALT\tAF\tSEQ_ID') 98 99 # Process each SNP position and each alternative allele 100 for pos in sorted(snp_dict['POS'].keys()): 101 pos_info = snp_dict['POS'][pos] 102 ref = pos_info.get('ref', '.') 103 alt_dict = pos_info.get('ALT', {}) 104 for alt, details in alt_dict.items(): 105 af = details.get('AF', 0) 106 seq_id = ",".join(details.get('SEQ_ID', [])) 107 output_lines.append(f"{snp_dict['#CHROM']}\t{pos + 1}\t{ref}\t{alt}\t{af}\t{seq_id}") 108 109 return output_lines 110 111 # validate correct input format 112 _validate() 113 114 # generate line data 115 if format_type == 'vcf': 116 lines = _vcf_format(snp_dict) 117 else: 118 lines = _tabular_format(snp_dict) 119 120 # export to file or return plain text 121 if path is not None: 122 out_path = f"{path}.{format_type}" 123 with open(out_path, 'w') as out_file: 124 out_file.write('\n'.join(lines)) 125 else: 126 return '\n'.join(lines)
Export a SNP dictionary to a VCF or tabular format. Importantly, the input dictionary has to be in the standard format that MSAexplorer produces.
Parameters
- snp_dict: Dictionary containing SNP positions and variant information.
- format_type: Format type ('vcf' or 'tabular'). Default is 'vcf'.
- path: Path to output VCF or tabular format. (optional)
Returns
A string containing the SNP data in the requested format.
Raises
- ValueError: if the input dictionary is missing required keys or format_type is invalid.
129def fasta(sequence: str | dict, header: str | None = None, path: str | None = None) -> str | None: 130 """ 131 Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. 132 The alignment format must have headers as keys and the corresponding sequence as values. 133 :param sequence: sequence to export 134 :param header: optional header file 135 :param path: path to save the file 136 :return: fasta formatted string 137 """ 138 def _validate_sequence(seq: str): 139 if not set(seq).issubset(set(config.POSSIBLE_CHARS)): 140 raise ValueError(f'Sequence contains invalid characters. Detected chars: {set(seq)}') 141 142 _check_and_create_path(path) 143 fasta_formated_sequence = '' 144 145 if type(sequence) is str: 146 _validate_sequence(sequence) 147 fasta_formated_sequence = f'>{header}\n{sequence}' 148 elif type(sequence) is dict: 149 for header, sequence in sequence.items(): 150 if type(sequence) is not str: 151 raise ValueError(f'Sequences in the dictionary must be strings.') 152 _validate_sequence(sequence) 153 fasta_formated_sequence = f'{fasta_formated_sequence}\n>{header}\n{sequence}' if fasta_formated_sequence != '' else f'>{header}\n{sequence}' 154 155 if path is not None: 156 with open(path, 'w') as out_file: 157 out_file.write(fasta_formated_sequence) 158 else: 159 return fasta_formated_sequence
Export a fasta sequence from str or alignment in dictionary format to either a string or save directly to file. The alignment format must have headers as keys and the corresponding sequence as values.
Parameters
- sequence: sequence to export
- header: optional header file
- path: path to save the file
Returns
fasta formatted string
162def stats(stat_data: list | ndarray, seperator: str = '\t', path: str | None = None) -> str | None: 163 """ 164 Export a list of stats per nucleotide to tabular or csv format. 165 166 :param stat_data: list of stat values 167 :param seperator: seperator for values and index 168 :param path: path to save the file 169 :return: tabular/csv formatted string 170 """ 171 # ini 172 _check_and_create_path(path) 173 174 lines = [f'position{seperator}value'] 175 176 for idx, stat_val in enumerate(stat_data): 177 lines.append(f'{idx}{seperator}{stat_val}') 178 179 if path is not None: 180 with open(path, 'w') as out_file: 181 out_file.write('\n'.join(lines)) 182 else: 183 return '\n'.join(lines)
Export a list of stats per nucleotide to tabular or csv format.
Parameters
- stat_data: list of stat values
- seperator: seperator for values and index
- path: path to save the file
Returns
tabular/csv formatted string
186def orf(orf_dict: dict, chrom: str, path: str | None = None) -> str | ValueError: 187 """ 188 Exports the ORF dictionary to a .bed file. 189 190 :param orf_dict: Dictionary containing ORF information. 191 :param path: Path to the output .bed file. 192 :param : Reference name 193 """ 194 if not orf_dict: 195 raise ValueError("The ORF dictionary is empty. Nothing to export.") 196 else: 197 if list(orf_dict[list(orf_dict.keys())[0]].keys()) != ['location', 'frame', 'strand', 'conservation', 'internal']: 198 raise ValueError("The ORF dictionary has not the right format.") 199 200 _check_and_create_path(path) 201 202 lines = [] 203 204 for orf_id, orf_data in orf_dict.items(): 205 lines.append( 206 f"{chrom}\t{orf_data['location'][0][0]}\t{orf_data['location'][0][1]}\t{orf_id}\t{orf_data['conservation']:.2f}\t{orf_data['strand']}" 207 ) 208 209 if path is not None: 210 with open(path, 'w') as out_file: 211 out_file.write('\n'.join(lines)) 212 else: 213 return '\n'.join(lines)
Exports the ORF dictionary to a .bed file.
Parameters
- orf_dict: Dictionary containing ORF information.
- path: Path to the output .bed file. :param : Reference name
216def character_freq(char_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 217 """ 218 Export a character frequency dictionary to tabular or csv format. 219 220 :param char_dict: Dictionary containing the character frequencies. 221 :param seperator: seperator for the table e.g. tabular or comma 222 :param path: Path to output table. 223 224 :return: A string containing the character frequency table. 225 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 226 """ 227 228 def _validate(): 229 if not isinstance(char_dict, dict): 230 raise ValueError('Data must be a dictionary.') 231 for key, value in char_dict.items(): 232 for key_2, value_2 in value.items(): 233 if key_2 not in config.POSSIBLE_CHARS: 234 raise ValueError(f'The key {key_2} is invalid.') 235 for key_3, value_3 in value_2.items(): 236 if key_3 not in ['counts', '% of alignment', '% of non-gapped']: 237 raise ValueError(f'The key "{key_3}" is invalid.') 238 239 # validate input 240 _validate() 241 242 lines = [F'sequence{seperator}char{seperator}counts{seperator}% of non-gapped'] 243 for key, value in char_dict.items(): 244 if key == 'total': 245 continue 246 for key_2, value_2 in value.items(): 247 if key_2 == '-': 248 continue 249 lines.append(f'{key}{seperator}{key_2}{seperator}{value_2["counts"]}{seperator}{value_2["% of non-gapped"]}') 250 251 # export data 252 if path is not None: 253 with open(path, 'w') as out_file: 254 out_file.write('\n'.join(lines)) 255 else: 256 return '\n'.join(lines)
Export a character frequency dictionary to tabular or csv format.
Parameters
- char_dict: Dictionary containing the character frequencies.
- seperator: seperator for the table e.g. tabular or comma
- path: Path to output table.
Returns
A string containing the character frequency table.
Raises
- ValueError: if the input dictionary is missing required keys or format_type is invalid.
259def percent_recovery(rec_dict: dict, seperator: str = '\t', path: str | None = None) -> str | None | ValueError: 260 """ 261 Export percent_recovery dictionary to tabular or csv format. 262 263 :param rec_dict: Dictionary containing the character frequencies. 264 :param seperator: seperator for the table e.g. tabular or comma 265 :param path: Path to output table. 266 267 :return: A string containing the character frequency table. 268 :raises ValueError: if the input dictionary is missing required keys or format_type is invalid. 269 """ 270 def _validate(): 271 if not isinstance(rec_dict, dict): 272 raise ValueError('Data must be a dictionary.') 273 for key, value in rec_dict.items(): 274 if type(key) != str: 275 raise ValueError(f'The key {key} is invalid.') 276 elif type(value) != float: 277 raise ValueError(f'The value {value} is invalid.') 278 279 # validate input 280 _validate() 281 282 lines = [F'sequence{seperator}% recovery'] 283 for key, value in rec_dict.items(): 284 lines.append( 285 f'{key}{seperator}{value}') 286 287 # export data 288 if path is not None: 289 with open(path, 'w') as out_file: 290 out_file.write('\n'.join(lines)) 291 else: 292 return '\n'.join(lines)
Export percent_recovery dictionary to tabular or csv format.
Parameters
- rec_dict: Dictionary containing the character frequencies.
- seperator: seperator for the table e.g. tabular or comma
- path: Path to output table.
Returns
A string containing the character frequency table.
Raises
- ValueError: if the input dictionary is missing required keys or format_type is invalid.