Toucan.scala 11.2 KB
Newer Older
bow's avatar
bow committed
1
/**
Peter van 't Hof's avatar
Peter van 't Hof committed
2 3 4 5 6 7 8 9 10
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
Sander Bollen's avatar
Sander Bollen committed
15 16
package nl.lumc.sasc.biopet.pipelines.toucan

17 18
import java.io.File

19
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
20 21
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
Sander Bollen's avatar
Sander Bollen committed
22
import nl.lumc.sasc.biopet.extensions.bcftools.BcftoolsView
Peter van 't Hof's avatar
Peter van 't Hof committed
23 24 25 26 27
import nl.lumc.sasc.biopet.extensions.bedtools.{ BedtoolsIntersect, BedtoolsMerge }
import nl.lumc.sasc.biopet.extensions.gatk.{ CatVariants, SelectVariants }
import nl.lumc.sasc.biopet.extensions.manwe.{ ManweAnnotateVcf, ManweSamplesImport }
import nl.lumc.sasc.biopet.extensions.tools.{ GvcfToBed, VcfWithVcf, VepNormalizer }
import nl.lumc.sasc.biopet.extensions.{ Bgzip, Ln, VariantEffectPredictor }
Peter van 't Hof's avatar
Peter van 't Hof committed
28
import nl.lumc.sasc.biopet.utils.VcfUtils
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30
import nl.lumc.sasc.biopet.utils.intervals.BedRecordList
Sander Bollen's avatar
Sander Bollen committed
31
import org.broadinstitute.gatk.queue.QScript
bow's avatar
bow committed
32
import scalaz._, Scalaz._
Sander Bollen's avatar
Sander Bollen committed
33 34

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
35 36 37 38
 * Pipeline to annotate a vcf file with VEP
 *
 * Created by ahbbollen on 15-1-15.
 */
Sander Bollen's avatar
Sander Bollen committed
39
class Toucan(val root: Configurable) extends QScript with BiopetQScript with SummaryQScript with Reference {
Sander Bollen's avatar
Sander Bollen committed
40 41 42
  def this() = this(null)

  @Input(doc = "Input VCF file", shortName = "Input", required = true)
43
  var inputVcf: File = _
Sander Bollen's avatar
Sander Bollen committed
44

Peter van 't Hof's avatar
Peter van 't Hof committed
45
  @Input(doc = "Input GVCF file", shortName = "gvcf", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
46 47
  var inputGvcf: Option[File] = None

Peter van 't Hof's avatar
Peter van 't Hof committed
48 49
  def outputName = inputVcf.getName.stripSuffix(".vcf.gz")

50
  def outputVcf: File = (gonlVcfFile, exacVcfFile) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
51 52 53 54
    case (Some(_), Some(_)) => new File(outputDir, s"$outputName.vep.normalized.gonl.exac.vcf.gz")
    case (Some(_), _)       => new File(outputDir, s"$outputName.vep.normalized.gonl.vcf.gz")
    case (_, Some(_))       => new File(outputDir, s"$outputName.vep.normalized.exac.vcf.gz")
    case _                  => new File(outputDir, s"$outputName.vep.normalized.vcf.gz")
55
  }
56

57 58
  lazy val minScatterGenomeSize: Long = config("min_scatter_genome_size", default = 75000000)

Peter van 't Hof's avatar
Peter van 't Hof committed
59
  lazy val enableScatter: Boolean = config("enable_scatter", default = {
60
    val ref = new FastaSequenceFile(referenceFasta(), true)
Sander Bollen's avatar
Sander Bollen committed
61
    val refLength = ref.getSequenceDictionary.getReferenceLength
62
    ref.close()
Sander Bollen's avatar
Sander Bollen committed
63
    refLength > minScatterGenomeSize
64 65
  })

Sander Bollen's avatar
Sander Bollen committed
66
  def sampleInfo: Map[String, Map[String, Any]] = root match {
Sander Bollen's avatar
Sander Bollen committed
67
    case m: MultiSampleQScript => m.samples.map { case (sampleId, sample) => sampleId -> sample.sampleTags }
68
    case null                  => VcfUtils.getSampleIds(inputVcf).map(x => x -> Map[String, Any]()).toMap
Sander Bollen's avatar
Sander Bollen committed
69 70
    case s: SampleLibraryTag   => s.sampleId.map(x => x -> Map[String, Any]()).toMap
    case _                     => throw new IllegalArgumentException("")
Sander Bollen's avatar
Sander Bollen committed
71 72
  }

73 74
  lazy val gonlVcfFile: Option[File] = config("gonl_vcf")
  lazy val exacVcfFile: Option[File] = config("exac_vcf")
75
  lazy val doVarda: Boolean = config("use_varda", default = false)
76

Sander Bollen's avatar
Sander Bollen committed
77
  def init(): Unit = {
78 79
    require(inputVcf != null, "No Input vcf given")
    inputFiles :+= new InputFile(inputVcf)
Sander Bollen's avatar
Sander Bollen committed
80 81
  }

82
  override def defaults = Map(
Sander Bollen's avatar
Sander Bollen committed
83
    "varianteffectpredictor" -> Map("everything" -> true, "failed" -> 1, "allow_non_variant" -> true)
84
  )
Sander Bollen's avatar
Sander Bollen committed
85 86

  def biopetScript(): Unit = {
87
    val useVcf: File = if (doVarda) {
Peter van 't Hof's avatar
Peter van 't Hof committed
88
      inputGvcf match {
89
        case Some(s) => varda(inputVcf, s)
90 91
        case _       => throw new IllegalArgumentException("You have not specified a GVCF file")
      }
92
    } else inputVcf
93

94
    if (enableScatter) {
Sander Bollen's avatar
Sander Bollen committed
95
      val doNotRemove: Boolean = config("do_not_remove", default = false)
Sander Bollen's avatar
Sander Bollen committed
96 97 98
      if (doNotRemove) {
        logger.warn("Chunking combined with do_not_remove possibly leads to mangled CSQ fields")
      }
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
      val outputVcfFiles = BedRecordList.fromReference(referenceFasta())
        .scatter(config("bin_size", default = 50000000))
        .allRecords.map { region =>

          val chunkName = s"${region.chr}-${region.start}-${region.end}"
          val chunkDir = new File(outputDir, "chunk" + File.separator + chunkName)
          chunkDir.mkdirs()
          val bedFile = new File(chunkDir, chunkName + ".bed")
          BedRecordList.fromList(List(region)).writeToFile(bedFile)
          bedFile.deleteOnExit()
          val sv = new SelectVariants(this)
          sv.variant = useVcf
          sv.out = new File(chunkDir, chunkName + ".vcf.gz")
          sv.intervals :+= bedFile
          sv.isIntermediate = true
          add(sv)

          runChunk(sv.out, chunkDir, chunkName)
Peter van 't Hof's avatar
Peter van 't Hof committed
117 118
        }

119 120
      val cv = new CatVariants(this)
      cv.variant = outputVcfFiles.toList
121
      cv.outputFile = outputVcf
122
      add(cv)
Peter van 't Hof's avatar
Peter van 't Hof committed
123
    } else runChunk(useVcf, outputDir, outputName)
124 125 126

    addSummaryJobs()
  }
127

128
  protected def runChunk(file: File, chunkDir: File, chunkName: String): File = {
129 130 131 132 133 134 135 136 137 138
    val vep = new VariantEffectPredictor(this)
    vep.input = file
    vep.output = new File(chunkDir, chunkName + ".vep.vcf")
    vep.isIntermediate = true
    add(vep)
    addSummarizable(vep, "variant_effect_predictor")

    val normalizer = new VepNormalizer(this)
    normalizer.inputVCF = vep.output
    normalizer.outputVcf = new File(chunkDir, chunkName + ".vep.normalized.vcf.gz")
139
    normalizer.isIntermediate = enableScatter || gonlVcfFile.isDefined || exacVcfFile.isDefined
140 141 142 143 144 145 146 147 148 149 150
    add(normalizer)

    var outputFile = normalizer.outputVcf

    gonlVcfFile match {
      case Some(gonlFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = gonlFile
        vcfWithVcf.output = swapExt(chunkDir, normalizer.outputVcf, ".vcf.gz", ".gonl.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_gonl", None)
151
        vcfWithVcf.isIntermediate = enableScatter || exacVcfFile.isDefined
152 153 154
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
Peter van 't Hof's avatar
Peter van 't Hof committed
155
    }
156

157 158 159 160 161 162 163
    exacVcfFile match {
      case Some(exacFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = exacFile
        vcfWithVcf.output = swapExt(chunkDir, outputFile, ".vcf.gz", ".exac.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_exac", None)
164
        vcfWithVcf.isIntermediate = enableScatter
165 166 167 168 169 170
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
    }

    outputFile
Sander Bollen's avatar
Sander Bollen committed
171 172
  }

173
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
174 175 176 177 178 179 180 181
   * Performs the varda import and activate for one sample
   *
   * @param sampleID the sampleID to be used
   * @param inputVcf the input VCF
   * @param gVCF the gVCF for coverage
   * @param annotation: ManweDownloadAnnotateVcf object of annotated vcf
   * @return
   */
Sander Bollen's avatar
Sander Bollen committed
182
  def importAndActivateSample(sampleID: String, sampleGroups: List[String], inputVcf: File,
183 184
                              gVCF: File, annotation: ManweAnnotateVcf): ManweActivateAfterAnnotImport = {

Sander Bollen's avatar
Sander Bollen committed
185 186
    val minGQ: Int = config("minimum_genome_quality", default = 20, namespace = "manwe")
    val isPublic: Boolean = config("varda_is_public", default = true, namespace = "manwe")
187 188 189 190 191 192 193 194 195

    val bedTrack = new GvcfToBed(this)
    bedTrack.inputVcf = gVCF
    bedTrack.outputBed = swapExt(outputDir, gVCF, ".vcf.gz", s""".$sampleID.bed""")
    bedTrack.minQuality = minGQ
    bedTrack.isIntermediate = true
    bedTrack.sample = Some(sampleID)
    add(bedTrack)

196 197 198 199 200 201
    val mergedBed = new BedtoolsMerge(this)
    mergedBed.input = bedTrack.outputBed
    mergedBed.dist = 5
    mergedBed.output = swapExt(outputDir, bedTrack.outputBed, ".bed", ".merged.bed")
    add(mergedBed)

202
    val bgzippedBed = new Bgzip(this)
203 204
    bgzippedBed.input = List(mergedBed.output)
    bgzippedBed.output = swapExt(outputDir, mergedBed.output, ".bed", ".bed.gz")
205 206 207
    add(bgzippedBed)

    val singleVcf = new BcftoolsView(this)
208 209
    singleVcf.input = inputVcf
    singleVcf.output = swapExt(outputDir, inputVcf, ".vcf.gz", s""".$sampleID.vcf.gz""")
210 211 212 213 214 215 216 217
    singleVcf.samples = List(sampleID)
    singleVcf.minAC = Some(1)
    singleVcf.isIntermediate = true
    add(singleVcf)

    val intersected = new BedtoolsIntersect(this)
    intersected.input = singleVcf.output
    intersected.intersectFile = bgzippedBed.output
218
    intersected.output = swapExt(outputDir, singleVcf.output, ".vcf.gz", ".intersected.vcf")
219 220
    add(intersected)

221 222 223 224 225
    val bgzippedIntersect = new Bgzip(this)
    bgzippedIntersect.input = List(intersected.output)
    bgzippedIntersect.output = swapExt(outputDir, intersected.output, ".vcf", ".vcf.gz")
    add(bgzippedIntersect)

226
    val imported = new ManweSamplesImport(this)
227
    imported.vcfs = List(bgzippedIntersect.output)
228 229 230
    imported.beds = List(bgzippedBed.output)
    imported.name = Some(sampleID)
    imported.public = isPublic
Sander Bollen's avatar
Sander Bollen committed
231
    imported.group = sampleGroups
232 233 234 235 236 237 238 239 240 241 242 243
    imported.waitToComplete = false
    imported.isIntermediate = true
    imported.output = swapExt(outputDir, intersected.output, ".vcf.gz", ".manwe.import")
    add(imported)

    val active = new ManweActivateAfterAnnotImport(this, annotation, imported)
    active.output = swapExt(outputDir, imported.output, ".import", ".activated")
    add(active)
    active

  }

Sander Bollen's avatar
Sander Bollen committed
244
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
245 246 247 248 249 250
   * Perform varda analysis
   *
   * @param vcf input vcf
   * @param gVcf The gVCF to be used for coverage calculations
   * @return return vcf
   */
251
  def varda(vcf: File, gVcf: File): File = {
252

Sander Bollen's avatar
Sander Bollen committed
253
    val annotationQueries: List[String] = config("annotation_queries", default = List("GLOBAL *"), namespace = "manwe")
Sander Bollen's avatar
Sander Bollen committed
254 255

    val annotate = new ManweAnnotateVcf(this)
256
    annotate.vcf = vcf
257 258 259
    if (annotationQueries.nonEmpty) {
      annotate.queries = annotationQueries
    }
Sander Bollen's avatar
Sander Bollen committed
260
    annotate.waitToComplete = true
261 262
    annotate.output = swapExt(outputDir, vcf, ".vcf.gz", ".manwe.annot")
    annotate.isIntermediate = true
Sander Bollen's avatar
Sander Bollen committed
263 264 265
    add(annotate)

    val annotatedVcf = new ManweDownloadAfterAnnotate(this, annotate)
266
    annotatedVcf.output = swapExt(outputDir, annotate.output, ".manwe.annot", "manwe.annot.vcf.gz")
267
    add(annotatedVcf)
Sander Bollen's avatar
Sander Bollen committed
268

Sander Bollen's avatar
Sander Bollen committed
269
    val activates = sampleInfo map { x =>
bow's avatar
bow committed
270 271 272 273 274 275 276
      val maybeSampleGroup = x._2.get("varda_group") match {
        case None => Some(Nil)
        case Some(vals) => vals match {
          case xs: List[_] => xs
            .traverse[Option, String] { x => Option(x.toString).filter(_ == x) }
          case otherwise => None
        }
Sander Bollen's avatar
Sander Bollen committed
277
      }
bow's avatar
bow committed
278 279
      val sampleGroup = maybeSampleGroup
        .getOrElse(throw new IllegalArgumentException("Sample tag 'varda_group' is not a list of strings"))
Sander Bollen's avatar
Sander Bollen committed
280 281
      importAndActivateSample(x._1, sampleGroup, vcf, gVcf, annotate)
    }
Sander Bollen's avatar
Sander Bollen committed
282 283

    val finalLn = new Ln(this)
Sander Bollen's avatar
Sander Bollen committed
284
    activates.foreach(x => finalLn.deps :+= x.output)
Sander Bollen's avatar
Sander Bollen committed
285
    finalLn.input = annotatedVcf.output
286
    finalLn.output = swapExt(outputDir, annotatedVcf.output, "manwe.annot.vcf.gz", ".varda_annotated.vcf.gz")
Sander Bollen's avatar
Sander Bollen committed
287
    finalLn.relative = true
Sander Bollen's avatar
Sander Bollen committed
288
    add(finalLn)
Sander Bollen's avatar
Sander Bollen committed
289 290 291 292

    finalLn.output
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
293 294
  def summaryFile = new File(outputDir, "Toucan.summary.json")

295
  def summaryFiles = Map("input_vcf" -> inputVcf, "outputVcf" -> outputVcf)
Peter van 't Hof's avatar
Peter van 't Hof committed
296 297

  def summarySettings = Map()
Sander Bollen's avatar
Sander Bollen committed
298 299 300
}

object Toucan extends PipelineCommand