FastqSync.scala 9.31 KB
Newer Older
bow's avatar
bow committed
1
/**
bow's avatar
bow committed
2
 * Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
bow's avatar
bow committed
3
4
5
6
7
8
9
10
 * @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
 *
 * This tool is a port of a Python implementation written by Martijn Vermaat[1]
 *
 * [1] https://github.com/martijnvermaat/bio-playground/blob/master/sync-paired-end-reads/sync_paired_end_reads.py
 */
package nl.lumc.sasc.biopet.tools

11
import java.io.File
bow's avatar
bow committed
12
13
14
import scala.io.Source
import scala.util.matching.Regex

15
16
17
import scala.annotation.tailrec
import scala.collection.JavaConverters._

bow's avatar
bow committed
18
19
import argonaut._, Argonaut._
import scalaz._, Scalaz._
20
import htsjdk.samtools.fastq.{ AsyncFastqWriter, BasicFastqWriter, FastqReader, FastqRecord }
bow's avatar
bow committed
21
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
22

bow's avatar
bow committed
23
24
25
26
27
28
29
30
31
32
33
34
35
import nl.lumc.sasc.biopet.core.BiopetJavaCommandLineFunction
import nl.lumc.sasc.biopet.core.ToolCommand
import nl.lumc.sasc.biopet.core.config.Configurable

/**
 * FastqSync function class for usage in Biopet pipelines
 *
 * @param root Configuration object for the pipeline
 */
class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {

  javaMainClass = getClass.getName

bow's avatar
bow committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  @Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
  var refFastq: File = _

  @Input(doc = "Input read 1 FASTQ file", shortName = "i", required = true)
  var inputFastq1: File = _

  @Input(doc = "Input read 2 FASTQ file", shortName = "j", required = true)
  var inputFastq2: File = _

  @Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
  var outputFastq1: File = _

  @Output(doc = "Output read 2 FASTQ file", shortName = "p", required = true)
  var outputFastq2: File = _

51
  @Output(doc = "Sync statistics", required = true)
bow's avatar
bow committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
  var outputStats: File = _

  // executed command line
  override def commandLine =
    super.commandLine +
      required("-r", refFastq) +
      required("-i", inputFastq1) +
      required("-j", inputFastq2) +
      required("-o", outputFastq1) +
      required("-p", outputFastq2) + " > " +
      required(outputStats)

  // summary statistics
  def summary: Json = {

    val regex = new Regex("""Filtered (\d*) reads from first read file.
                            |Filtered (\d*) reads from second read file.
                            |Synced read files contain (\d*) reads.""".stripMargin,
      "R1", "R2", "RL")

    val (countFilteredR1, countFilteredR2, countRLeft) =
      if (outputStats.exists) {
        val text = Source
          .fromFile(outputStats)
          .getLines()
          .mkString("\n")
        regex.findFirstMatchIn(text) match {
          case None         => (0, 0, 0)
          case Some(rmatch) => (rmatch.group("R1").toInt, rmatch.group("R2").toInt, rmatch.group("RL").toInt)
        }
      } else (0, 0, 0)

    ("num_reads_discarded_R1" := countFilteredR1) ->:
      ("num_reads_discarded_R2" := countFilteredR2) ->:
      ("num_reads_kept" := countRLeft) ->:
      jEmptyObject
  }
bow's avatar
bow committed
89
90
91
92
}

object FastqSync extends ToolCommand {

bow's avatar
bow committed
93
94
95
  /** Regex for capturing read ID ~ taking into account its read pair mark (if present) */
  private val idRegex = "[_/][12]\\s??|\\s".r

96
  /** Implicit class to allow for lazy retrieval of FastqRecord ID without any read pair mark */
97
  private implicit class FastqPair(fq: FastqRecord) {
bow's avatar
bow committed
98
    lazy val fragId = idRegex.split(fq.getReadHeader)(0)
99
100
101
102
103
104
105
106
107
108
109
  }

  /**
   * Filters out FastqRecord that are not present in the input iterators, using
   * a reference sequence object
   *
   * @param pre FastqReader over reference FASTQ file
   * @param seqA FastqReader over read 1
   * @param seqB FastqReader over read 2
   * @return
   */
110
111
  def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader,
                seqOutA: AsyncFastqWriter, seqOutB: AsyncFastqWriter): (Long, Long, Long) = {
112
113
114
115
116
117
118
119
120
121
122
123
124
    // counters for discarded A and B seqections + total kept
    // NOTE: we are reasigning values to these variables in the recursion below
    var (numDiscA, numDiscB, numKept) = (0, 0, 0)

    /**
     * Syncs read pairs recursively
     *
     * @param pre Reference sequence, assumed to be a superset of both seqA and seqB
     * @param seqA Sequence over read 1
     * @param seqB Sequence over read 2
     * @return
     */
    @tailrec def syncIter(pre: Stream[FastqRecord],
125
                          seqA: Stream[FastqRecord], seqB: Stream[FastqRecord]): Unit =
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
      (pre.headOption, seqA.headOption, seqB.headOption) match {
        // where the magic happens!
        case (Some(r), Some(a), Some(b)) =>
          // value of A iterator in the next recursion
          val nextA =
            // hold A if its head is not equal to reference
            if (a.fragId != r.fragId) {
              if (b.fragId == r.fragId) numDiscB += 1
              seqA
              // otherwise, go to next item
            } else seqA.tail
          // like A above
          val nextB =
            if (b.fragId != r.fragId) {
              if (a.fragId == r.fragId) numDiscA += 1
              seqB
            } else seqB.tail
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
          // write only if all IDs are equal
          if (a.fragId == r.fragId && b.fragId == r.fragId) {
            numKept += 1
            seqOutA.write(a)
            seqOutB.write(b)
          }
          syncIter(pre.tail, nextA, nextB)
        // recursion base case: both iterators have been exhausted
        case (_, None, None) => ;
        // illegal state: reference sequence exhausted but not seqA or seqB
        case (None, Some(_), _) | (None, _, Some(_)) =>
          throw new NoSuchElementException("Reference record stream shorter than expected")
        // keep recursion going if A still has items (we want to count how many)
        case (_, _, None) =>
          numDiscA += 1
          syncIter(pre.tail, seqA.tail, seqB)
        // like above but for B
        case (_, None, _) =>
          numDiscB += 1
          syncIter(pre.tail, seqA, seqB.tail)
163
164
      }

165
    syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream)
bow's avatar
bow committed
166
167
    seqOutA.close()
    seqOutB.close()
168

169
    (numDiscA, numDiscB, numKept)
170
171
  }

bow's avatar
bow committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
  /** Function to merge this tool's summary with summaries from other objects */
  // TODO: refactor this into the object? At least make it work on the summary object
  def mergeSummaries(jsons: List[Json]): Json = {

    val (read1FilteredCount, read2FilteredCount, readsLeftCount) = jsons
      // extract the values we require from each JSON object into tuples
      .map {
        case json =>
          (json.field("num_reads_discarded_R1").get.numberOrZero.toInt,
            json.field("num_reads_discarded_R2").get.numberOrZero.toInt,
            json.field("num_reads_kept").get.numberOrZero.toInt)
      }
      // reduce the tuples
      .reduceLeft {
        (x: (Int, Int, Int), y: (Int, Int, Int)) =>
          (x._1 + y._1, x._2 + y._2, x._3 + y._3)
      }

    ("num_reads_discarded_R1" := read1FilteredCount) ->:
      ("num_reads_discarded_R2" := read2FilteredCount) ->:
      ("num_reads_kept" := readsLeftCount) ->:
      jEmptyObject
  }

196
197
198
199
200
  case class Args(refFastq: File = new File(""),
                  inputFastq1: File = new File(""),
                  inputFastq2: File = new File(""),
                  outputFastq1: File = new File(""),
                  outputFastq2: File = new File("")) extends AbstractArgs
bow's avatar
bow committed
201
202
203
204
205

  class OptParser extends AbstractOptParser {

    head(
      s"""
bow's avatar
bow committed
206
207
208
209
        |$commandName - Sync paired-end FASTQ files.
        |
        |This tool works with gzipped or non-gzipped FASTQ files. The output
        |file will be gzipped when the input is also gzipped.
bow's avatar
bow committed
210
211
      """.stripMargin)

212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
    opt[File]('r', "ref") required () valueName "<fastq>" action { (x, c) =>
      c.copy(refFastq = x)
    } validate {
      x => if (x.exists) success else failure("Reference FASTQ file not found")
    } text "Reference FASTQ file"

    opt[File]('i', "in1") required () valueName "<fastq>" action { (x, c) =>
      c.copy(inputFastq1 = x)
    } validate {
      x => if (x.exists) success else failure("Input FASTQ file 1 not found")
    } text "Input FASTQ file 1"

    opt[File]('j', "in2") required () valueName "<fastq[.gz]>" action { (x, c) =>
      c.copy(inputFastq2 = x)
    } validate {
      x => if (x.exists) success else failure("Input FASTQ file 2 not found")
    } text "Input FASTQ file 2"

    opt[File]('o', "out1") required () valueName "<fastq[.gz]>" action { (x, c) =>
      c.copy(outputFastq1 = x)
    } text "Output FASTQ file 1"

    opt[File]('p', "out2") required () valueName "<fastq>" action { (x, c) =>
      c.copy(outputFastq2 = x)
    } text "Output FASTQ file 2"
bow's avatar
bow committed
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
  }

  /**
   * Parses the command line argument
   *
   * @param args Array of arguments
   * @return
   */
  def parseArgs(args: Array[String]): Args = new OptParser()
    .parse(args, Args())
    .getOrElse(sys.exit(1))

  def main(args: Array[String]): Unit = {

    val commandArgs: Args = parseArgs(args)

253
    val (numDiscA, numDiscB, numKept) = syncFastq(
254
255
      new FastqReader(commandArgs.refFastq),
      new FastqReader(commandArgs.inputFastq1),
256
      new FastqReader(commandArgs.inputFastq2),
257
      new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000),
258
259
260
261
262
      new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000))

    println(s"Filtered $numDiscA reads from first read file.")
    println(s"Filtered $numDiscB reads from second read file.")
    println(s"Synced $numKept files contain %d reads.")
bow's avatar
bow committed
263
264
  }
}