Source code for tool.fastq_splitter

"""
.. See the NOTICE file distributed with this work for additional information
   regarding copyright ownership.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""

from __future__ import print_function

import os
import sys

from utils import logger

try:
    if hasattr(sys, '_run_from_cmdl') is True:
        raise ImportError
    from pycompss.api.parameter import FILE_IN, FILE_OUT, IN, OUT
    from pycompss.api.task import task
    # from pycompss.api.api import compss_wait_on
except ImportError:
    logger.warn("[Warning] Cannot import \"pycompss\" API packages.")
    logger.warn("          Using mock decorators.")

    from utils.dummy_pycompss import FILE_IN, FILE_OUT, IN, OUT  # pylint: disable=ungrouped-imports
    from utils.dummy_pycompss import task  # pylint: disable=ungrouped-imports
    # from utils.dummy_pycompss import compss_wait_on  # pylint: disable=ungrouped-imports

from basic_modules.tool import Tool
from basic_modules.metadata import Metadata

from tool.fastqreader import fastqreader
from tool.common import common


# ------------------------------------------------------------------------------

[docs]class fastq_splitter(Tool): # pylint: disable=invalid-name """ Script for splitting up FASTQ files into manageable chunks """ def __init__(self, configuration=None): """ Initialise the tool with its configuration. Parameters ---------- configuration : dict a dictionary containing parameters that define how the operation should be carried out, which are specific to each Tool. """ logger.info("FASTQ Splitter") Tool.__init__(self) if configuration is None: configuration = {} self.configuration.update(configuration)
[docs] @task( in_file1=FILE_IN, tag=IN, out_file=FILE_OUT, files_out=OUT, returns=list) def single_splitter(self, in_file1, out_file, tag='tmp'): # pylint: disable=no-self-use """ Function to divide the FastQ files into separate sub files of 1000000 sequences so that the aligner can run in parallel. Parameters ---------- in_file1 : str Location of first FASTQ file tag : str DEFAULT = tmp Tag used to identify the files. Useful if this is getting run manually on a single machine multiple times to prevent collisions of file names Returns ------- Returns: Returns a list of the files that have been generated. Each sub list containing the two paired end files for that subset. paired_files : list List of lists of pair end files. Each sub list containing the two paired end files for that subset. """ fqr = fastqreader() fqr.openFastQ(in_file1) file_loc_1_tmp = fqr.createOutputFiles(tag) fastq_chunk_size = 1000000 if "fastq_chunk_size" in self.configuration: fastq_chunk_size = self.configuration["fastq_chunk_size"] record1 = fqr.next(1) count_r3 = 0 tmp_dir = os.path.split(file_loc_1_tmp)[0] files_out = [[os.path.split(file_loc_1_tmp)[1]]] while fqr.eof(1) is False: fqr.writeOutput(record1, 1) record1 = fqr.next(1) count_r3 += 1 if count_r3 % fastq_chunk_size == 0: file_loc_1_new = fqr.incrementOutputFiles() files_out.append([os.path.split(file_loc_1_new)[1]]) fqr.closeFastQ() fqr.closeOutputFiles() untar_idx = True if "no-untar" in self.configuration and self.configuration["no-untar"] is True: untar_idx = False if untar_idx is True: if os.path.isfile(out_file): os.remove(out_file) output_file_pregz = out_file.replace('.tar.gz', '.tar') common.tar_folder(tmp_dir, output_file_pregz) common.zip_file(output_file_pregz, 2) return files_out
[docs] @task( in_file1=FILE_IN, in_file2=FILE_IN, tag=IN, out_file=FILE_OUT, files_out=OUT, returns=list) def paired_splitter(self, in_file1, in_file2, out_file, tag='tmp'): # pylint: disable=no-self-use,too-many-locals,too-many-statements """ Function to divide the paired-end FastQ files into separte sub files of 1000000 sequences so that the aligner can run in parallel. Parameters ---------- in_file1 : str Location of first paired end FASTQ file in_file2 : str Location of second paired end FASTQ file tag : str DEFAULT = tmp Tag used to identify the files. Useful if this is getting run manually on a single machine multiple times to prevent collisions of file names Returns ------- Returns: Returns a list of lists of the files that have been generated. Each sub list containing the two paired end files for that subset. paired_files : list List of lists of pair end files. Each sub list containing the two paired end files for that subset. """ fqr = fastqreader() fqr.openFastQ(in_file1, in_file2) file_loc_1_tmp, file_loc_2_tmp = fqr.createOutputFiles(tag) fastq_chunk_size = 1000000 if "fastq_chunk_size" in self.configuration: fastq_chunk_size = self.configuration["fastq_chunk_size"] record1 = fqr.next(1) record2 = fqr.next(2) count_r1 = 0 count_r2 = 0 count_r3 = 0 tmp_dir = os.path.split(file_loc_1_tmp)[0] files_out = [[ os.path.split(file_loc_1_tmp)[1], os.path.split(file_loc_2_tmp)[1] ]] while fqr.eof(1) is False and fqr.eof(2) is False: r1_id = record1["id"].split(" ") r2_id = record2["id"].split(" ") if r1_id[0] == r2_id[0]: fqr.writeOutput(record1, 1) fqr.writeOutput(record2, 2) record1 = fqr.next(1) record2 = fqr.next(2) count_r1 += 1 count_r2 += 1 count_r3 += 1 elif r1_id[0] < r2_id[0]: record1 = fqr.next(1) count_r1 += 1 else: record2 = fqr.next(2) count_r2 += 1 if count_r3 % fastq_chunk_size == 0: file_loc_1_new, file_loc_2_new = fqr.incrementOutputFiles() files_out.append([ os.path.split(file_loc_1_new)[1], os.path.split(file_loc_2_new)[1] ]) fqr.closeFastQ() fqr.closeOutputFiles() untar_idx = True if "no-untar" in self.configuration and self.configuration["no-untar"] is True: untar_idx = False if untar_idx is True: if os.path.isfile(out_file): os.remove(out_file) output_file_pregz = out_file.replace('.tar.gz', '.tar') common.tar_folder(tmp_dir, output_file_pregz) common.zip_file(output_file_pregz, 2) return files_out
[docs] def run(self, input_files, input_metadata, output_files): """ The main function to run the splitting of FASTQ files (single or paired) so that they can aligned in a distributed manner Parameters ---------- input_files : dict List of input fastq file locations metadata : dict output_files : dict Returns ------- output_file : str Location of compressed (.tar.gz) of the split FASTQ files output_names : list List of file names in the compressed file """ sources = [input_files["fastq1"]] if "fastq2" in input_files: sources.append(input_files["fastq2"]) self.paired_splitter( input_files["fastq1"], input_files["fastq2"], output_files["output"] ) else: self.single_splitter( input_files["fastq1"], output_files["output"], ) # results = compss_wait_on(results) fastq_tar_meta = Metadata( data_type=input_metadata["fastq1"].data_type, file_type="TAR", file_path=output_files["output"], sources=sources, taxon_id=input_metadata["fastq1"].taxon_id, meta_data={ "tool": "fastq_splitter" } ) return ( {"output": output_files["output"]}, {"output": fastq_tar_meta} )
# ------------------------------------------------------------------------------