Toucan.scala 11 KB
Newer Older
bow's avatar
bow committed
1
/**
Peter van 't Hof's avatar
Peter van 't Hof committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
Sander Bollen's avatar
Sander Bollen committed
16 17
package nl.lumc.sasc.biopet.pipelines.toucan

18 19
import java.io.File

20
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
21 22
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
Sander Bollen's avatar
Sander Bollen committed
23
import nl.lumc.sasc.biopet.extensions.bcftools.BcftoolsView
Peter van 't Hof's avatar
Peter van 't Hof committed
24 25 26 27 28
import nl.lumc.sasc.biopet.extensions.bedtools.{ BedtoolsIntersect, BedtoolsMerge }
import nl.lumc.sasc.biopet.extensions.gatk.{ CatVariants, SelectVariants }
import nl.lumc.sasc.biopet.extensions.manwe.{ ManweAnnotateVcf, ManweSamplesImport }
import nl.lumc.sasc.biopet.extensions.tools.{ GvcfToBed, VcfWithVcf, VepNormalizer }
import nl.lumc.sasc.biopet.extensions.{ Bgzip, Ln, VariantEffectPredictor }
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.VcfUtils
Peter van 't Hof's avatar
Peter van 't Hof committed
30
import nl.lumc.sasc.biopet.utils.config.Configurable
31
import nl.lumc.sasc.biopet.utils.intervals.BedRecordList
Sander Bollen's avatar
Sander Bollen committed
32
import org.broadinstitute.gatk.queue.QScript
bow's avatar
bow committed
33
import scalaz._, Scalaz._
Sander Bollen's avatar
Sander Bollen committed
34 35

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
36 37 38 39
 * Pipeline to annotate a vcf file with VEP
 *
 * Created by ahbbollen on 15-1-15.
 */
Sander Bollen's avatar
Sander Bollen committed
40
class Toucan(val root: Configurable) extends QScript with BiopetQScript with SummaryQScript with Reference {
Sander Bollen's avatar
Sander Bollen committed
41 42 43
  def this() = this(null)

  @Input(doc = "Input VCF file", shortName = "Input", required = true)
44
  var inputVcf: File = _
Sander Bollen's avatar
Sander Bollen committed
45

Peter van 't Hof's avatar
Peter van 't Hof committed
46
  @Input(doc = "Input GVCF file", shortName = "gvcf", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
47 48
  var inputGvcf: Option[File] = None

Peter van 't Hof's avatar
Peter van 't Hof committed
49 50
  def outputName = inputVcf.getName.stripSuffix(".vcf.gz")

51
  def outputVcf: File = (gonlVcfFile, exacVcfFile) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
52 53 54 55
    case (Some(_), Some(_)) => new File(outputDir, s"$outputName.vep.normalized.gonl.exac.vcf.gz")
    case (Some(_), _)       => new File(outputDir, s"$outputName.vep.normalized.gonl.vcf.gz")
    case (_, Some(_))       => new File(outputDir, s"$outputName.vep.normalized.exac.vcf.gz")
    case _                  => new File(outputDir, s"$outputName.vep.normalized.vcf.gz")
56
  }
57

58 59
  lazy val minScatterGenomeSize: Long = config("min_scatter_genome_size", default = 75000000)

Peter van 't Hof's avatar
Peter van 't Hof committed
60
  lazy val enableScatter: Boolean = config("enable_scatter", default = {
61 62 63 64 65 66
    val ref = new FastaSequenceFile(referenceFasta(), true)
    val refLenght = ref.getSequenceDictionary.getReferenceLength
    ref.close()
    refLenght > minScatterGenomeSize
  })

Sander Bollen's avatar
Sander Bollen committed
67
  def sampleInfo: Map[String, Map[String, Any]] = root match {
Sander Bollen's avatar
Sander Bollen committed
68
    case m: MultiSampleQScript => m.samples.map { case (sampleId, sample) => sampleId -> sample.sampleTags }
69
    case null                  => VcfUtils.getSampleIds(inputVcf).map(x => x -> Map[String, Any]()).toMap
Sander Bollen's avatar
Sander Bollen committed
70 71
    case s: SampleLibraryTag   => s.sampleId.map(x => x -> Map[String, Any]()).toMap
    case _                     => throw new IllegalArgumentException("")
Sander Bollen's avatar
Sander Bollen committed
72 73
  }

74 75
  lazy val gonlVcfFile: Option[File] = config("gonl_vcf")
  lazy val exacVcfFile: Option[File] = config("exac_vcf")
76
  lazy val doVarda: Boolean = config("use_varda", default = false)
77

Sander Bollen's avatar
Sander Bollen committed
78
  def init(): Unit = {
79 80
    require(inputVcf != null, "No Input vcf given")
    inputFiles :+= new InputFile(inputVcf)
Sander Bollen's avatar
Sander Bollen committed
81 82
  }

83
  override def defaults = Map(
Sander Bollen's avatar
Sander Bollen committed
84
    "varianteffectpredictor" -> Map("everything" -> true, "failed" -> 1, "allow_non_variant" -> true)
85
  )
Sander Bollen's avatar
Sander Bollen committed
86 87

  def biopetScript(): Unit = {
88
    val useVcf: File = if (doVarda) {
Peter van 't Hof's avatar
Peter van 't Hof committed
89
      inputGvcf match {
90
        case Some(s) => varda(inputVcf, s)
91 92
        case _       => throw new IllegalArgumentException("You have not specified a GVCF file")
      }
93
    } else inputVcf
94

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
    if (enableScatter) {
      val outputVcfFiles = BedRecordList.fromReference(referenceFasta())
        .scatter(config("bin_size", default = 50000000))
        .allRecords.map { region =>

          val chunkName = s"${region.chr}-${region.start}-${region.end}"
          val chunkDir = new File(outputDir, "chunk" + File.separator + chunkName)
          chunkDir.mkdirs()
          val bedFile = new File(chunkDir, chunkName + ".bed")
          BedRecordList.fromList(List(region)).writeToFile(bedFile)
          bedFile.deleteOnExit()
          val sv = new SelectVariants(this)
          sv.variant = useVcf
          sv.out = new File(chunkDir, chunkName + ".vcf.gz")
          sv.intervals :+= bedFile
          sv.isIntermediate = true
          add(sv)

          runChunk(sv.out, chunkDir, chunkName)
Peter van 't Hof's avatar
Peter van 't Hof committed
114 115
        }

116 117
      val cv = new CatVariants(this)
      cv.variant = outputVcfFiles.toList
118
      cv.outputFile = outputVcf
119
      add(cv)
Peter van 't Hof's avatar
Peter van 't Hof committed
120
    } else runChunk(useVcf, outputDir, outputName)
121 122 123

    addSummaryJobs()
  }
124

125
  protected def runChunk(file: File, chunkDir: File, chunkName: String): File = {
126 127 128 129 130 131 132 133 134 135
    val vep = new VariantEffectPredictor(this)
    vep.input = file
    vep.output = new File(chunkDir, chunkName + ".vep.vcf")
    vep.isIntermediate = true
    add(vep)
    addSummarizable(vep, "variant_effect_predictor")

    val normalizer = new VepNormalizer(this)
    normalizer.inputVCF = vep.output
    normalizer.outputVcf = new File(chunkDir, chunkName + ".vep.normalized.vcf.gz")
136
    normalizer.isIntermediate = enableScatter || gonlVcfFile.isDefined || exacVcfFile.isDefined
137 138 139 140 141 142 143 144 145 146 147
    add(normalizer)

    var outputFile = normalizer.outputVcf

    gonlVcfFile match {
      case Some(gonlFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = gonlFile
        vcfWithVcf.output = swapExt(chunkDir, normalizer.outputVcf, ".vcf.gz", ".gonl.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_gonl", None)
148
        vcfWithVcf.isIntermediate = enableScatter || exacVcfFile.isDefined
149 150 151
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
Peter van 't Hof's avatar
Peter van 't Hof committed
152
    }
153

154 155 156 157 158 159 160
    exacVcfFile match {
      case Some(exacFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = exacFile
        vcfWithVcf.output = swapExt(chunkDir, outputFile, ".vcf.gz", ".exac.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_exac", None)
161
        vcfWithVcf.isIntermediate = enableScatter
162 163 164 165 166 167
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
    }

    outputFile
Sander Bollen's avatar
Sander Bollen committed
168 169
  }

170
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
171 172 173 174 175 176 177 178
   * Performs the varda import and activate for one sample
   *
   * @param sampleID the sampleID to be used
   * @param inputVcf the input VCF
   * @param gVCF the gVCF for coverage
   * @param annotation: ManweDownloadAnnotateVcf object of annotated vcf
   * @return
   */
Sander Bollen's avatar
Sander Bollen committed
179
  def importAndActivateSample(sampleID: String, sampleGroups: List[String], inputVcf: File,
180 181
                              gVCF: File, annotation: ManweAnnotateVcf): ManweActivateAfterAnnotImport = {

Sander Bollen's avatar
Sander Bollen committed
182 183
    val minGQ: Int = config("minimum_genome_quality", default = 20, namespace = "manwe")
    val isPublic: Boolean = config("varda_is_public", default = true, namespace = "manwe")
184 185 186 187 188 189 190 191 192

    val bedTrack = new GvcfToBed(this)
    bedTrack.inputVcf = gVCF
    bedTrack.outputBed = swapExt(outputDir, gVCF, ".vcf.gz", s""".$sampleID.bed""")
    bedTrack.minQuality = minGQ
    bedTrack.isIntermediate = true
    bedTrack.sample = Some(sampleID)
    add(bedTrack)

193 194 195 196 197 198
    val mergedBed = new BedtoolsMerge(this)
    mergedBed.input = bedTrack.outputBed
    mergedBed.dist = 5
    mergedBed.output = swapExt(outputDir, bedTrack.outputBed, ".bed", ".merged.bed")
    add(mergedBed)

199
    val bgzippedBed = new Bgzip(this)
200 201
    bgzippedBed.input = List(mergedBed.output)
    bgzippedBed.output = swapExt(outputDir, mergedBed.output, ".bed", ".bed.gz")
202 203 204
    add(bgzippedBed)

    val singleVcf = new BcftoolsView(this)
205 206
    singleVcf.input = inputVcf
    singleVcf.output = swapExt(outputDir, inputVcf, ".vcf.gz", s""".$sampleID.vcf.gz""")
207 208 209 210 211 212 213 214
    singleVcf.samples = List(sampleID)
    singleVcf.minAC = Some(1)
    singleVcf.isIntermediate = true
    add(singleVcf)

    val intersected = new BedtoolsIntersect(this)
    intersected.input = singleVcf.output
    intersected.intersectFile = bgzippedBed.output
215
    intersected.output = swapExt(outputDir, singleVcf.output, ".vcf.gz", ".intersected.vcf")
216 217
    add(intersected)

218 219 220 221 222
    val bgzippedIntersect = new Bgzip(this)
    bgzippedIntersect.input = List(intersected.output)
    bgzippedIntersect.output = swapExt(outputDir, intersected.output, ".vcf", ".vcf.gz")
    add(bgzippedIntersect)

223
    val imported = new ManweSamplesImport(this)
224
    imported.vcfs = List(bgzippedIntersect.output)
225 226 227
    imported.beds = List(bgzippedBed.output)
    imported.name = Some(sampleID)
    imported.public = isPublic
Sander Bollen's avatar
Sander Bollen committed
228
    imported.group = sampleGroups
229 230 231 232 233 234 235 236 237 238 239 240
    imported.waitToComplete = false
    imported.isIntermediate = true
    imported.output = swapExt(outputDir, intersected.output, ".vcf.gz", ".manwe.import")
    add(imported)

    val active = new ManweActivateAfterAnnotImport(this, annotation, imported)
    active.output = swapExt(outputDir, imported.output, ".import", ".activated")
    add(active)
    active

  }

Sander Bollen's avatar
Sander Bollen committed
241
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
242 243 244 245 246 247
   * Perform varda analysis
   *
   * @param vcf input vcf
   * @param gVcf The gVCF to be used for coverage calculations
   * @return return vcf
   */
248
  def varda(vcf: File, gVcf: File): File = {
249

Sander Bollen's avatar
Sander Bollen committed
250
    val annotationQueries: List[String] = config("annotation_queries", default = List("GLOBAL *"), namespace = "manwe")
Sander Bollen's avatar
Sander Bollen committed
251 252

    val annotate = new ManweAnnotateVcf(this)
253
    annotate.vcf = vcf
254 255 256
    if (annotationQueries.nonEmpty) {
      annotate.queries = annotationQueries
    }
Sander Bollen's avatar
Sander Bollen committed
257
    annotate.waitToComplete = true
258 259
    annotate.output = swapExt(outputDir, vcf, ".vcf.gz", ".manwe.annot")
    annotate.isIntermediate = true
Sander Bollen's avatar
Sander Bollen committed
260 261 262
    add(annotate)

    val annotatedVcf = new ManweDownloadAfterAnnotate(this, annotate)
263
    annotatedVcf.output = swapExt(outputDir, annotate.output, ".manwe.annot", "manwe.annot.vcf.gz")
264
    add(annotatedVcf)
Sander Bollen's avatar
Sander Bollen committed
265

Sander Bollen's avatar
Sander Bollen committed
266
    val activates = sampleInfo map { x =>
bow's avatar
bow committed
267 268 269 270 271 272 273
      val maybeSampleGroup = x._2.get("varda_group") match {
        case None => Some(Nil)
        case Some(vals) => vals match {
          case xs: List[_] => xs
            .traverse[Option, String] { x => Option(x.toString).filter(_ == x) }
          case otherwise => None
        }
Sander Bollen's avatar
Sander Bollen committed
274
      }
bow's avatar
bow committed
275 276
      val sampleGroup = maybeSampleGroup
        .getOrElse(throw new IllegalArgumentException("Sample tag 'varda_group' is not a list of strings"))
Sander Bollen's avatar
Sander Bollen committed
277 278
      importAndActivateSample(x._1, sampleGroup, vcf, gVcf, annotate)
    }
Sander Bollen's avatar
Sander Bollen committed
279 280

    val finalLn = new Ln(this)
Sander Bollen's avatar
Sander Bollen committed
281
    activates.foreach(x => finalLn.deps :+= x.output)
Sander Bollen's avatar
Sander Bollen committed
282
    finalLn.input = annotatedVcf.output
283
    finalLn.output = swapExt(outputDir, annotatedVcf.output, "manwe.annot.vcf.gz", ".varda_annotated.vcf.gz")
Sander Bollen's avatar
Sander Bollen committed
284
    finalLn.relative = true
Sander Bollen's avatar
Sander Bollen committed
285
    add(finalLn)
Sander Bollen's avatar
Sander Bollen committed
286 287 288 289

    finalLn.output
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
290 291
  def summaryFile = new File(outputDir, "Toucan.summary.json")

292
  def summaryFiles = Map("input_vcf" -> inputVcf, "outputVcf" -> outputVcf)
Peter van 't Hof's avatar
Peter van 't Hof committed
293 294

  def summarySettings = Map()
Sander Bollen's avatar
Sander Bollen committed
295 296 297
}

object Toucan extends PipelineCommand