MergeTables.scala 10.6 KB
Newer Older
bow's avatar
bow committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
19
20
21
package nl.lumc.sasc.biopet.tools

import java.io.{ File, BufferedWriter, FileWriter, OutputStreamWriter }
import scala.io.{ BufferedSource, Source }
import scala.collection.mutable.{ Set => MutSet }

22
import nl.lumc.sasc.biopet.core.{ ToolCommandFuntion, BiopetJavaCommandLineFunction, ToolCommand }
bow's avatar
bow committed
23
import nl.lumc.sasc.biopet.core.config.Configurable
bow's avatar
bow committed
24
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
25

bow's avatar
bow committed
26
27
28
29
30
/**
 * Biopet wrapper for the [[MergeTables]] command line tool.
 *
 * @param root [[Configurable]] object
 */
31
class MergeTables(val root: Configurable) extends ToolCommandFuntion {
32
33
34

  javaMainClass = getClass.getName

bow's avatar
bow committed
35
  override val defaultCoreMemory = 6.0
bow's avatar
bow committed
36

bow's avatar
bow committed
37
  /** List of input tabular files */
bow's avatar
bow committed
38
39
40
  @Input(doc = "Input table files", required = true)
  var inputTables: List[File] = List.empty[File]

bow's avatar
bow committed
41
  /** Output file */
bow's avatar
bow committed
42
43
44
45
  @Output(doc = "Output merged table", required = true)
  var output: File = null

  // TODO: should be List[Int] really
bow's avatar
bow committed
46
  /** List of column indices to combine to make a unique identifier per row */
bow's avatar
bow committed
47
  var idColumnIndices: List[String] = config("id_column_indices", default = List("1"))
bow's avatar
bow committed
48
49

  /** Index of column from each tabular file containing the values to be put in the final merged table */
bow's avatar
bow committed
50
  var valueColumnIndex: Int = config("value_column_index", default = 2)
bow's avatar
bow committed
51
52

  /** Name of the identifier column in the output file */
bow's avatar
bow committed
53
  var idColumnName: Option[String] = config("id_column_name")
bow's avatar
bow committed
54
55

  /** Common file extension of all input files */
bow's avatar
bow committed
56
  var fileExtension: Option[String] = config("file_extension")
bow's avatar
bow committed
57
58

  /** Number of header lines from each input file to ignore */
bow's avatar
bow committed
59
  var numHeaderLines: Option[Int] = config("num_header_lines")
bow's avatar
bow committed
60
61

  /** String to use when a value is missing from an input file */
bow's avatar
bow committed
62
  var fallbackString: Option[String] = config("fallback_string")
bow's avatar
bow committed
63
64

  /** Column delimiter of each input file (used for splitting into columns */
bow's avatar
bow committed
65
66
67
68
69
70
71
72
73
  var delimiter: Option[String] = config("delimiter")

  // executed command line
  override def commandLine =
    super.commandLine +
      required("-i", idColumnIndices.mkString(",")) +
      required("-a", valueColumnIndex) +
      optional("-n", idColumnName) +
      optional("-e", fileExtension) +
74
      optional("-m", numHeaderLines) +
bow's avatar
bow committed
75
76
      optional("-f", fallbackString) +
      optional("-d", delimiter) +
bow's avatar
bow committed
77
78
      required("-o", output) +
      required("", repeat(inputTables), escape = false)
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
}

object MergeTables extends ToolCommand {

  /** Type alias for sample name */
  type Sample = String

  /** Type alias for feature name */
  type Feature = String

  /** Type alias for value string */
  type Value = String

  /** Case class for storing input data */
  case class InputTable(name: String, source: BufferedSource)

  /** Processes the current line into a pair of feature identifier and its value */
96
97
98
99
100
101
102
103
104
105
106
  def processLine(line: String, idIdces: Seq[Int], valIdx: Int, delimiter: Char = '\t',
                  idSeparator: String = ","): (Feature, Value) = {

    // split on delimiter and remove empty strings
    val split = line
      .split(delimiter)
      .filter(_.nonEmpty)
    val colSize = split.size
    require(idIdces.forall(_ < colSize), "All feature ID indices must be smaller than number of columns")
    require(valIdx < colSize, "Value index must be smaller than number of columns")

107
    val featureId = idIdces.map { split }.mkString(idSeparator)
108
    (featureId, split(valIdx))
109
110
111
  }

  /** Merges multiple tables into a single map */
112
113
  def mergeTables(inputs: Seq[InputTable], idIdces: Seq[Int], valIdx: Int,
                  numHeaderLines: Int, delimiter: Char = '\t'): Map[Sample, Map[Feature, Value]] = {
114

115
    require(numHeaderLines >= 0, "Number of input header lines less than zero")
116
117
118
119

    inputs
      // make a map of the base name and the file object
      .map {
120
        case InputTable(name, source) =>
121
122
123
124
125
126
127
128
129
130
131
132
          val featureValues: Seq[(Feature, Value)] = source
            .getLines()
            // drop header lines according to input
            .drop(numHeaderLines)
            // process the line into a feature, value pair
            .map(processLine(_, idIdces, valIdx))
            // turn into seq
            .toSeq
          // raise error if there are duplicate values (otherwise they will be silently ignored and we may lose values)
          require(featureValues.map(_._1).distinct.size == featureValues.size,
            s"Duplicate features exist in $name")
          name -> featureValues.toMap
133
      }.toMap
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
  }

  /** Writes results to stdout */
  def writeOutput(results: Map[Sample, Map[Feature, Value]], output: BufferedWriter,
                  fallback: String, featureName: String): Unit = {
    // sort samples alphabetically
    val samples: Seq[Sample] = results.keys.toSeq.sorted
    // create union of all feature IDs
    val features: Seq[Feature] = results
      // retrieve feature names from each sample
      .map { case (_, featureMap) => featureMap.keySet }
      // fold all of them into a single container
      .foldLeft(MutSet.empty[Feature]) { case (acc, x) => acc ++= x }
      // and create a sorted sequence
      .toSeq.sorted

    output.write((featureName +: samples).mkString("\t") + "\n")
    features.foreach {
      case feature =>
        // get feature values for each sample (order == order of samples in header)
        val line = feature +: samples
          .map(results(_).getOrElse(feature, fallback))
        output.write(line.mkString("\t") + "\n")
    }
    output.flush()
  }

  /** Default arguments */
  case class Args(inputTables: Seq[File] = Seq.empty[File],
                  idColumnName: String = "feature",
                  idColumnIndices: Seq[Int] = Seq.empty[Int],
                  valueColumnIndex: Int = -1,
                  fileExtension: String = "",
167
                  numHeaderLines: Int = 0,
168
                  fallbackString: String = "-",
169
                  delimiter: Char = '\t',
170
171
172
173
174
                  out: File = new File("-")) extends AbstractArgs

  /** Command line argument parser */
  class OptParser extends AbstractOptParser {

175
176
177
178
179
    import scopt.Read

    // implicit conversion for argument parsing
    implicit val charRead: Read[Char] = Read.reads { _.toCharArray.head }

180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
    head(
      s"""
         |$commandName - Tabular file merging based on feature ID equality.
      """.stripMargin)

    opt[Seq[Int]]('i', "id_column_index") required () valueName "<idx1>,<idx2>, ..." action { (x, c) =>
      c.copy(idColumnIndices = x.map(_ - 1)) // -1 to convert to Scala-style 0-based indexing
    } validate {
      x => if (x.forall(_ > 0)) success else failure("Index must be at least 1")
    } text "Index of feature ID column from each input file (1-based)"

    opt[Int]('a', "value_column_index") required () valueName "<idx>" action { (x, c) =>
      c.copy(valueColumnIndex = x - 1) // -1 to convert to Scala-style 0-based indexing
    } validate {
      x => if (x > 0) success else failure("Index must be at least 1")
    } text "Index of column from each input file containing the value to merge (1-based)"

    opt[File]('o', "output") optional () valueName "<path>" action { (x, c) =>
      c.copy(out = x)
    } text "Path to output file (default: '-' <stdout>)"

    opt[String]('n', "id_column_name") optional () valueName "<name>" action { (x, c) =>
      c.copy(idColumnName = x)
    } text "Name of feature ID column in the output merged file (default: feature)"

    opt[String]('e', "strip_extension") optional () valueName "<ext>" action { (x, c) =>
      c.copy(fileExtension = x)
    } text "Common extension of all input tables to strip (default: empty string)"

209
    opt[Int]('m', "num_header_lines") optional () action { (x, c) =>
210
      c.copy(numHeaderLines = x)
211
    } text "The number of header lines present in all input files (default: 0; no header)"
212
213
214
215
216

    opt[String]('f', "fallback") optional () action { (x, c) =>
      c.copy(fallbackString = x)
    } text "The string to use when a value for a feature is missing in one or more sample(s) (default: '-')"

217
218
    opt[Char]('d', "delimiter") optional () action { (x, c) =>
      c.copy(delimiter = x)
bow's avatar
bow committed
219
    } text "The character used for separating columns in the input files (default: '\\t')"
220

221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    arg[File]("<input_tables> ...") unbounded () optional () action { (x, c) =>
      c.copy(inputTables = c.inputTables :+ x)
    } validate {
      x => if (x.exists) success else failure(s"File '$x' does not exist")
    } text "Input tables to merge"

    note(
      """
        |This tool merges multiple tab-delimited files and outputs a single
        |tab delimited file whose columns are the feature IDs and a single
        |column from each input files.
        |
        |Note that in each input file there must not be any duplicate features.
        |If there are, the tool will only keep one and discard the rest.
      """.stripMargin)

  }

  /** Parses the command line argument */
  def parseArgs(args: Array[String]): Args = new OptParser()
    .parse(args, Args())
    .getOrElse(sys.exit(1))

244
245
246
247
248
249
  /** Transforms the input file seq into a seq of [[InputTable]] objects */
  def prepInput(inFiles: Seq[File], ext: String = ""): Seq[InputTable] = {
    require(inFiles.map(_.getName.stripSuffix(ext)).distinct.size == inFiles.size, "Duplicate samples exist in inputs")
    inFiles
      .map(tableFile => InputTable(tableFile.getName.stripSuffix(ext), Source.fromFile(tableFile)))
  }
250

251
252
253
254
255
  /** Creates the output writer object */
  def prepOutput(outFile: File): BufferedWriter = outFile match {
    case f if f.toString == "-" => new BufferedWriter(new OutputStreamWriter(System.out))
    case otherwise              => new BufferedWriter(new FileWriter(otherwise))
  }
256

bow's avatar
bow committed
257
  /** Main entry point */
258
259
  def main(args: Array[String]): Unit = {
    val commandArgs: Args = parseArgs(args)
260

261
    import commandArgs._
262

263
264
265
266
267
    val outStream = prepOutput(out)
    val merged = mergeTables(prepInput(inputTables, fileExtension),
      idColumnIndices, valueColumnIndex, numHeaderLines, delimiter)
    writeOutput(merged, outStream, fallbackString, idColumnName)
    outStream.close()
268
269
  }
}