FastqSync.scala 9.56 KB
Newer Older
bow's avatar
bow committed
1
2
3
4
5
6
7
8
9
10
/**
 * Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
 * @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
 *
 * This tool is a port of a Python implementation written by Martijn Vermaat[1]
 *
 * [1] https://github.com/martijnvermaat/bio-playground/blob/master/sync-paired-end-reads/sync_paired_end_reads.py
 */
package nl.lumc.sasc.biopet.tools

11
import java.io.File
bow's avatar
bow committed
12
13
14
import scala.io.Source
import scala.util.matching.Regex

15
16
17
import scala.annotation.tailrec
import scala.collection.JavaConverters._

bow's avatar
bow committed
18
19
import argonaut._, Argonaut._
import scalaz._, Scalaz._
20
import htsjdk.samtools.fastq.{ AsyncFastqWriter, BasicFastqWriter, FastqReader, FastqRecord }
bow's avatar
bow committed
21
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
22

bow's avatar
bow committed
23
24
25
26
27
28
29
30
31
32
33
34
35
import nl.lumc.sasc.biopet.core.BiopetJavaCommandLineFunction
import nl.lumc.sasc.biopet.core.ToolCommand
import nl.lumc.sasc.biopet.core.config.Configurable

/**
 * FastqSync function class for usage in Biopet pipelines
 *
 * @param root Configuration object for the pipeline
 */
class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {

  javaMainClass = getClass.getName

bow's avatar
bow committed
36
  @Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
bow's avatar
bow committed
37
  var refFastq: File = null
bow's avatar
bow committed
38
39

  @Input(doc = "Input read 1 FASTQ file", shortName = "i", required = true)
bow's avatar
bow committed
40
  var inputFastq1: File = null
bow's avatar
bow committed
41
42

  @Input(doc = "Input read 2 FASTQ file", shortName = "j", required = true)
bow's avatar
bow committed
43
  var inputFastq2: File = null
bow's avatar
bow committed
44
45

  @Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
bow's avatar
bow committed
46
  var outputFastq1: File = null
bow's avatar
bow committed
47
48

  @Output(doc = "Output read 2 FASTQ file", shortName = "p", required = true)
bow's avatar
bow committed
49
  var outputFastq2: File = null
bow's avatar
bow committed
50

51
  @Output(doc = "Sync statistics", required = true)
bow's avatar
bow committed
52
  var outputStats: File = null
bow's avatar
bow committed
53

Peter van 't Hof's avatar
Peter van 't Hof committed
54
55
  override val defaultVmem = "5G"

bow's avatar
bow committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
  // executed command line
  override def commandLine =
    super.commandLine +
      required("-r", refFastq) +
      required("-i", inputFastq1) +
      required("-j", inputFastq2) +
      required("-o", outputFastq1) +
      required("-p", outputFastq2) + " > " +
      required(outputStats)

  // summary statistics
  def summary: Json = {

    val regex = new Regex("""Filtered (\d*) reads from first read file.
                            |Filtered (\d*) reads from second read file.
                            |Synced read files contain (\d*) reads.""".stripMargin,
      "R1", "R2", "RL")

    val (countFilteredR1, countFilteredR2, countRLeft) =
      if (outputStats.exists) {
        val text = Source
          .fromFile(outputStats)
          .getLines()
          .mkString("\n")
        regex.findFirstMatchIn(text) match {
          case None         => (0, 0, 0)
          case Some(rmatch) => (rmatch.group("R1").toInt, rmatch.group("R2").toInt, rmatch.group("RL").toInt)
        }
      } else (0, 0, 0)

    ("num_reads_discarded_R1" := countFilteredR1) ->:
      ("num_reads_discarded_R2" := countFilteredR2) ->:
      ("num_reads_kept" := countRLeft) ->:
      jEmptyObject
  }
bow's avatar
bow committed
91
92
93
94
}

object FastqSync extends ToolCommand {

bow's avatar
bow committed
95
96
97
  /** Regex for capturing read ID ~ taking into account its read pair mark (if present) */
  private val idRegex = "[_/][12]\\s??|\\s".r

98
  /** Implicit class to allow for lazy retrieval of FastqRecord ID without any read pair mark */
99
  private implicit class FastqPair(fq: FastqRecord) {
bow's avatar
bow committed
100
    lazy val fragId = idRegex.split(fq.getReadHeader)(0)
101
102
103
104
105
106
107
108
109
110
111
  }

  /**
   * Filters out FastqRecord that are not present in the input iterators, using
   * a reference sequence object
   *
   * @param pre FastqReader over reference FASTQ file
   * @param seqA FastqReader over read 1
   * @param seqB FastqReader over read 2
   * @return
   */
112
113
  def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader,
                seqOutA: AsyncFastqWriter, seqOutB: AsyncFastqWriter): (Long, Long, Long) = {
114
115
116
117
118
119
120
121
122
123
124
125
126
    // counters for discarded A and B seqections + total kept
    // NOTE: we are reasigning values to these variables in the recursion below
    var (numDiscA, numDiscB, numKept) = (0, 0, 0)

    /**
     * Syncs read pairs recursively
     *
     * @param pre Reference sequence, assumed to be a superset of both seqA and seqB
     * @param seqA Sequence over read 1
     * @param seqB Sequence over read 2
     * @return
     */
    @tailrec def syncIter(pre: Stream[FastqRecord],
127
                          seqA: Stream[FastqRecord], seqB: Stream[FastqRecord]): Unit =
128
129
130
      (pre.headOption, seqA.headOption, seqB.headOption) match {
        // where the magic happens!
        case (Some(r), Some(a), Some(b)) =>
bow's avatar
bow committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
          val (nextA, nextB) = (a.fragId == r.fragId, b.fragId == r.fragId) match {
            // all IDs are equal to ref
            case (true, true) =>
              numKept += 1
              seqOutA.write(a)
              seqOutB.write(b)
              (seqA.tail, seqB.tail)
            // B not equal to ref and A is equal, then we discard A and progress
            case (true, false) =>
              numDiscA += 1
              (seqA.tail, seqB)
            // A not equal to ref and B is equal, then we discard B and progress
            case (false, true) =>
              numDiscB += 1
              (seqA, seqB.tail)
            case (false, false) =>
              (seqA, seqB)
148
149
          }
          syncIter(pre.tail, nextA, nextB)
150
        // recursion base case: both iterators have been exhausted
151
        case (_, None, None) => ;
152
153
154
155
156
157
        // illegal state: reference sequence exhausted but not seqA or seqB
        case (None, Some(_), _) | (None, _, Some(_)) =>
          throw new NoSuchElementException("Reference record stream shorter than expected")
        // keep recursion going if A still has items (we want to count how many)
        case (_, _, None) =>
          numDiscA += 1
158
          syncIter(pre.tail, seqA.tail, seqB)
159
160
161
        // like above but for B
        case (_, None, _) =>
          numDiscB += 1
162
          syncIter(pre.tail, seqA, seqB.tail)
163
164
      }

165
    syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream)
166

167
    (numDiscA, numDiscB, numKept)
168
169
  }

bow's avatar
bow committed
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
  /** Function to merge this tool's summary with summaries from other objects */
  // TODO: refactor this into the object? At least make it work on the summary object
  def mergeSummaries(jsons: List[Json]): Json = {

    val (read1FilteredCount, read2FilteredCount, readsLeftCount) = jsons
      // extract the values we require from each JSON object into tuples
      .map {
        case json =>
          (json.field("num_reads_discarded_R1").get.numberOrZero.toInt,
            json.field("num_reads_discarded_R2").get.numberOrZero.toInt,
            json.field("num_reads_kept").get.numberOrZero.toInt)
      }
      // reduce the tuples
      .reduceLeft {
        (x: (Int, Int, Int), y: (Int, Int, Int)) =>
          (x._1 + y._1, x._2 + y._2, x._3 + y._3)
      }

    ("num_reads_discarded_R1" := read1FilteredCount) ->:
      ("num_reads_discarded_R2" := read2FilteredCount) ->:
      ("num_reads_kept" := readsLeftCount) ->:
      jEmptyObject
  }

194
195
196
197
198
  case class Args(refFastq: File = new File(""),
                  inputFastq1: File = new File(""),
                  inputFastq2: File = new File(""),
                  outputFastq1: File = new File(""),
                  outputFastq2: File = new File("")) extends AbstractArgs
bow's avatar
bow committed
199
200
201
202
203

  class OptParser extends AbstractOptParser {

    head(
      s"""
bow's avatar
bow committed
204
205
206
207
        |$commandName - Sync paired-end FASTQ files.
        |
        |This tool works with gzipped or non-gzipped FASTQ files. The output
        |file will be gzipped when the input is also gzipped.
bow's avatar
bow committed
208
209
      """.stripMargin)

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
    opt[File]('r', "ref") required () valueName "<fastq>" action { (x, c) =>
      c.copy(refFastq = x)
    } validate {
      x => if (x.exists) success else failure("Reference FASTQ file not found")
    } text "Reference FASTQ file"

    opt[File]('i', "in1") required () valueName "<fastq>" action { (x, c) =>
      c.copy(inputFastq1 = x)
    } validate {
      x => if (x.exists) success else failure("Input FASTQ file 1 not found")
    } text "Input FASTQ file 1"

    opt[File]('j', "in2") required () valueName "<fastq[.gz]>" action { (x, c) =>
      c.copy(inputFastq2 = x)
    } validate {
      x => if (x.exists) success else failure("Input FASTQ file 2 not found")
    } text "Input FASTQ file 2"

    opt[File]('o', "out1") required () valueName "<fastq[.gz]>" action { (x, c) =>
      c.copy(outputFastq1 = x)
    } text "Output FASTQ file 1"

    opt[File]('p', "out2") required () valueName "<fastq>" action { (x, c) =>
      c.copy(outputFastq2 = x)
    } text "Output FASTQ file 2"
bow's avatar
bow committed
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
  }

  /**
   * Parses the command line argument
   *
   * @param args Array of arguments
   * @return
   */
  def parseArgs(args: Array[String]): Args = new OptParser()
    .parse(args, Args())
    .getOrElse(sys.exit(1))

  def main(args: Array[String]): Unit = {

    val commandArgs: Args = parseArgs(args)

bow's avatar
bow committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
    val refReader = new FastqReader(commandArgs.refFastq)
    val AReader = new FastqReader(commandArgs.inputFastq1)
    val BReader = new FastqReader(commandArgs.inputFastq2)
    val AWriter = new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000)
    val BWriter = new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)

    try {
      val (numDiscA, numDiscB, numKept) = syncFastq(refReader, AReader, BReader, AWriter, BWriter)
      println(s"Filtered $numDiscA reads from first read file.")
      println(s"Filtered $numDiscB reads from second read file.")
      println(s"Synced files contain $numKept reads.")
    } finally {
      refReader.close()
      AReader.close()
      BReader.close()
      AWriter.close()
      BWriter.close()
    }
bow's avatar
bow committed
269
270
  }
}