Toucan.scala 11.2 KB
Newer Older
bow's avatar
bow committed
1
/**
Peter van 't Hof's avatar
Peter van 't Hof committed
2 3 4 5 6 7 8 9 10
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
Sander Bollen's avatar
Sander Bollen committed
15 16
package nl.lumc.sasc.biopet.pipelines.toucan

17 18
import java.io.File

19
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
20 21
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
Sander Bollen's avatar
Sander Bollen committed
22
import nl.lumc.sasc.biopet.extensions.bcftools.BcftoolsView
Peter van 't Hof's avatar
Peter van 't Hof committed
23 24 25 26 27
import nl.lumc.sasc.biopet.extensions.bedtools.{ BedtoolsIntersect, BedtoolsMerge }
import nl.lumc.sasc.biopet.extensions.gatk.{ CatVariants, SelectVariants }
import nl.lumc.sasc.biopet.extensions.manwe.{ ManweAnnotateVcf, ManweSamplesImport }
import nl.lumc.sasc.biopet.extensions.tools.{ GvcfToBed, VcfWithVcf, VepNormalizer }
import nl.lumc.sasc.biopet.extensions.{ Bgzip, Ln, VariantEffectPredictor }
Peter van 't Hof's avatar
Peter van 't Hof committed
28
import nl.lumc.sasc.biopet.utils.VcfUtils
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30
import nl.lumc.sasc.biopet.utils.intervals.BedRecordList
Sander Bollen's avatar
Sander Bollen committed
31
import org.broadinstitute.gatk.queue.QScript
bow's avatar
bow committed
32
import scalaz._, Scalaz._
Sander Bollen's avatar
Sander Bollen committed
33 34

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
35 36 37 38
 * Pipeline to annotate a vcf file with VEP
 *
 * Created by ahbbollen on 15-1-15.
 */
Sander Bollen's avatar
Sander Bollen committed
39
class Toucan(val root: Configurable) extends QScript with BiopetQScript with SummaryQScript with Reference {
Sander Bollen's avatar
Sander Bollen committed
40 41 42
  def this() = this(null)

  @Input(doc = "Input VCF file", shortName = "Input", required = true)
43
  var inputVcf: File = _
Sander Bollen's avatar
Sander Bollen committed
44

Peter van 't Hof's avatar
Peter van 't Hof committed
45
  @Input(doc = "Input GVCF file", shortName = "gvcf", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
46 47
  var inputGvcf: Option[File] = None

Peter van 't Hof's avatar
Peter van 't Hof committed
48 49
  def outputName = inputVcf.getName.stripSuffix(".vcf.gz")

50
  def outputVcf: File = (gonlVcfFile, exacVcfFile) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
51 52 53 54
    case (Some(_), Some(_)) => new File(outputDir, s"$outputName.vep.normalized.gonl.exac.vcf.gz")
    case (Some(_), _)       => new File(outputDir, s"$outputName.vep.normalized.gonl.vcf.gz")
    case (_, Some(_))       => new File(outputDir, s"$outputName.vep.normalized.exac.vcf.gz")
    case _                  => new File(outputDir, s"$outputName.vep.normalized.vcf.gz")
55
  }
56

57 58
  lazy val minScatterGenomeSize: Long = config("min_scatter_genome_size", default = 75000000)

Peter van 't Hof's avatar
Peter van 't Hof committed
59
  lazy val enableScatter: Boolean = config("enable_scatter", default = {
60
    val ref = new FastaSequenceFile(referenceFasta(), true)
Sander Bollen's avatar
Sander Bollen committed
61
    val refLength = ref.getSequenceDictionary.getReferenceLength
62
    ref.close()
Sander Bollen's avatar
Sander Bollen committed
63
    refLength > minScatterGenomeSize
64 65
  })

Sander Bollen's avatar
Sander Bollen committed
66
  def sampleInfo: Map[String, Map[String, Any]] = root match {
Sander Bollen's avatar
Sander Bollen committed
67
    case m: MultiSampleQScript => m.samples.map { case (sampleId, sample) => sampleId -> sample.sampleTags }
68
    case null                  => VcfUtils.getSampleIds(inputVcf).map(x => x -> Map[String, Any]()).toMap
Sander Bollen's avatar
Sander Bollen committed
69 70
    case s: SampleLibraryTag   => s.sampleId.map(x => x -> Map[String, Any]()).toMap
    case _                     => throw new IllegalArgumentException("")
Sander Bollen's avatar
Sander Bollen committed
71 72
  }

73 74
  lazy val gonlVcfFile: Option[File] = config("gonl_vcf")
  lazy val exacVcfFile: Option[File] = config("exac_vcf")
75
  lazy val doVarda: Boolean = config("use_varda", default = false)
76

Sander Bollen's avatar
Sander Bollen committed
77
  def init(): Unit = {
78 79
    require(inputVcf != null, "No Input vcf given")
    inputFiles :+= new InputFile(inputVcf)
Sander Bollen's avatar
Sander Bollen committed
80 81
  }

82
  override def defaults = Map(
Sander Bollen's avatar
Sander Bollen committed
83
    "varianteffectpredictor" -> Map("everything" -> true, "failed" -> 1, "allow_non_variant" -> true)
84
  )
Sander Bollen's avatar
Sander Bollen committed
85 86

  def biopetScript(): Unit = {
87
    val useVcf: File = if (doVarda) {
Peter van 't Hof's avatar
Peter van 't Hof committed
88
      inputGvcf match {
89
        case Some(s) => varda(inputVcf, s)
90 91
        case _       => throw new IllegalArgumentException("You have not specified a GVCF file")
      }
92
    } else inputVcf
93

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    if (enableScatter) {
      val outputVcfFiles = BedRecordList.fromReference(referenceFasta())
        .scatter(config("bin_size", default = 50000000))
        .allRecords.map { region =>

          val chunkName = s"${region.chr}-${region.start}-${region.end}"
          val chunkDir = new File(outputDir, "chunk" + File.separator + chunkName)
          chunkDir.mkdirs()
          val bedFile = new File(chunkDir, chunkName + ".bed")
          BedRecordList.fromList(List(region)).writeToFile(bedFile)
          bedFile.deleteOnExit()
          val sv = new SelectVariants(this)
          sv.variant = useVcf
          sv.out = new File(chunkDir, chunkName + ".vcf.gz")
          sv.intervals :+= bedFile
          sv.isIntermediate = true
          add(sv)

          runChunk(sv.out, chunkDir, chunkName)
Peter van 't Hof's avatar
Peter van 't Hof committed
113 114
        }

Peter van 't Hof's avatar
Peter van 't Hof committed
115 116 117
      if (this.functions.filter(_.isInstanceOf[VepNormalizer]).exists(_.asInstanceOf[VepNormalizer].doNotRemove))
        logger.warn("Chunking combined with do_not_remove possibly leads to mangled CSQ fields")

118 119
      val cv = new CatVariants(this)
      cv.variant = outputVcfFiles.toList
120
      cv.outputFile = outputVcf
121
      add(cv)
Peter van 't Hof's avatar
Peter van 't Hof committed
122
    } else runChunk(useVcf, outputDir, outputName)
123 124 125

    addSummaryJobs()
  }
126

127
  protected def runChunk(file: File, chunkDir: File, chunkName: String): File = {
128 129 130 131 132 133 134 135 136 137
    val vep = new VariantEffectPredictor(this)
    vep.input = file
    vep.output = new File(chunkDir, chunkName + ".vep.vcf")
    vep.isIntermediate = true
    add(vep)
    addSummarizable(vep, "variant_effect_predictor")

    val normalizer = new VepNormalizer(this)
    normalizer.inputVCF = vep.output
    normalizer.outputVcf = new File(chunkDir, chunkName + ".vep.normalized.vcf.gz")
138
    normalizer.isIntermediate = enableScatter || gonlVcfFile.isDefined || exacVcfFile.isDefined
139 140 141 142 143 144 145 146 147 148 149
    add(normalizer)

    var outputFile = normalizer.outputVcf

    gonlVcfFile match {
      case Some(gonlFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = gonlFile
        vcfWithVcf.output = swapExt(chunkDir, normalizer.outputVcf, ".vcf.gz", ".gonl.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_gonl", None)
150
        vcfWithVcf.isIntermediate = enableScatter || exacVcfFile.isDefined
151 152 153
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
Peter van 't Hof's avatar
Peter van 't Hof committed
154
    }
155

156 157 158 159 160 161 162
    exacVcfFile match {
      case Some(exacFile) =>
        val vcfWithVcf = new VcfWithVcf(this)
        vcfWithVcf.input = outputFile
        vcfWithVcf.secondaryVcf = exacFile
        vcfWithVcf.output = swapExt(chunkDir, outputFile, ".vcf.gz", ".exac.vcf.gz")
        vcfWithVcf.fields ::= ("AF", "AF_exac", None)
163
        vcfWithVcf.isIntermediate = enableScatter
164 165 166 167 168 169
        add(vcfWithVcf)
        outputFile = vcfWithVcf.output
      case _ =>
    }

    outputFile
Sander Bollen's avatar
Sander Bollen committed
170 171
  }

172
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
173 174 175 176 177 178 179 180
   * Performs the varda import and activate for one sample
   *
   * @param sampleID the sampleID to be used
   * @param inputVcf the input VCF
   * @param gVCF the gVCF for coverage
   * @param annotation: ManweDownloadAnnotateVcf object of annotated vcf
   * @return
   */
Sander Bollen's avatar
Sander Bollen committed
181
  def importAndActivateSample(sampleID: String, sampleGroups: List[String], inputVcf: File,
182 183
                              gVCF: File, annotation: ManweAnnotateVcf): ManweActivateAfterAnnotImport = {

Sander Bollen's avatar
Sander Bollen committed
184 185
    val minGQ: Int = config("minimum_genome_quality", default = 20, namespace = "manwe")
    val isPublic: Boolean = config("varda_is_public", default = true, namespace = "manwe")
186 187 188 189 190 191 192 193 194

    val bedTrack = new GvcfToBed(this)
    bedTrack.inputVcf = gVCF
    bedTrack.outputBed = swapExt(outputDir, gVCF, ".vcf.gz", s""".$sampleID.bed""")
    bedTrack.minQuality = minGQ
    bedTrack.isIntermediate = true
    bedTrack.sample = Some(sampleID)
    add(bedTrack)

195 196 197 198 199 200
    val mergedBed = new BedtoolsMerge(this)
    mergedBed.input = bedTrack.outputBed
    mergedBed.dist = 5
    mergedBed.output = swapExt(outputDir, bedTrack.outputBed, ".bed", ".merged.bed")
    add(mergedBed)

201
    val bgzippedBed = new Bgzip(this)
202 203
    bgzippedBed.input = List(mergedBed.output)
    bgzippedBed.output = swapExt(outputDir, mergedBed.output, ".bed", ".bed.gz")
204 205 206
    add(bgzippedBed)

    val singleVcf = new BcftoolsView(this)
207 208
    singleVcf.input = inputVcf
    singleVcf.output = swapExt(outputDir, inputVcf, ".vcf.gz", s""".$sampleID.vcf.gz""")
209 210 211 212 213 214 215 216
    singleVcf.samples = List(sampleID)
    singleVcf.minAC = Some(1)
    singleVcf.isIntermediate = true
    add(singleVcf)

    val intersected = new BedtoolsIntersect(this)
    intersected.input = singleVcf.output
    intersected.intersectFile = bgzippedBed.output
217
    intersected.output = swapExt(outputDir, singleVcf.output, ".vcf.gz", ".intersected.vcf")
218 219
    add(intersected)

220 221 222 223 224
    val bgzippedIntersect = new Bgzip(this)
    bgzippedIntersect.input = List(intersected.output)
    bgzippedIntersect.output = swapExt(outputDir, intersected.output, ".vcf", ".vcf.gz")
    add(bgzippedIntersect)

225
    val imported = new ManweSamplesImport(this)
226
    imported.vcfs = List(bgzippedIntersect.output)
227 228 229
    imported.beds = List(bgzippedBed.output)
    imported.name = Some(sampleID)
    imported.public = isPublic
Sander Bollen's avatar
Sander Bollen committed
230
    imported.group = sampleGroups
231 232 233 234 235 236 237 238 239 240 241 242
    imported.waitToComplete = false
    imported.isIntermediate = true
    imported.output = swapExt(outputDir, intersected.output, ".vcf.gz", ".manwe.import")
    add(imported)

    val active = new ManweActivateAfterAnnotImport(this, annotation, imported)
    active.output = swapExt(outputDir, imported.output, ".import", ".activated")
    add(active)
    active

  }

Sander Bollen's avatar
Sander Bollen committed
243
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
244 245 246 247 248 249
   * Perform varda analysis
   *
   * @param vcf input vcf
   * @param gVcf The gVCF to be used for coverage calculations
   * @return return vcf
   */
250
  def varda(vcf: File, gVcf: File): File = {
251

Sander Bollen's avatar
Sander Bollen committed
252
    val annotationQueries: List[String] = config("annotation_queries", default = List("GLOBAL *"), namespace = "manwe")
Sander Bollen's avatar
Sander Bollen committed
253 254

    val annotate = new ManweAnnotateVcf(this)
255
    annotate.vcf = vcf
256 257 258
    if (annotationQueries.nonEmpty) {
      annotate.queries = annotationQueries
    }
Sander Bollen's avatar
Sander Bollen committed
259
    annotate.waitToComplete = true
260 261
    annotate.output = swapExt(outputDir, vcf, ".vcf.gz", ".manwe.annot")
    annotate.isIntermediate = true
Sander Bollen's avatar
Sander Bollen committed
262 263 264
    add(annotate)

    val annotatedVcf = new ManweDownloadAfterAnnotate(this, annotate)
265
    annotatedVcf.output = swapExt(outputDir, annotate.output, ".manwe.annot", "manwe.annot.vcf.gz")
266
    add(annotatedVcf)
Sander Bollen's avatar
Sander Bollen committed
267

Sander Bollen's avatar
Sander Bollen committed
268
    val activates = sampleInfo map { x =>
bow's avatar
bow committed
269 270 271 272 273 274 275
      val maybeSampleGroup = x._2.get("varda_group") match {
        case None => Some(Nil)
        case Some(vals) => vals match {
          case xs: List[_] => xs
            .traverse[Option, String] { x => Option(x.toString).filter(_ == x) }
          case otherwise => None
        }
Sander Bollen's avatar
Sander Bollen committed
276
      }
bow's avatar
bow committed
277 278
      val sampleGroup = maybeSampleGroup
        .getOrElse(throw new IllegalArgumentException("Sample tag 'varda_group' is not a list of strings"))
Sander Bollen's avatar
Sander Bollen committed
279 280
      importAndActivateSample(x._1, sampleGroup, vcf, gVcf, annotate)
    }
Sander Bollen's avatar
Sander Bollen committed
281 282

    val finalLn = new Ln(this)
Sander Bollen's avatar
Sander Bollen committed
283
    activates.foreach(x => finalLn.deps :+= x.output)
Sander Bollen's avatar
Sander Bollen committed
284
    finalLn.input = annotatedVcf.output
285
    finalLn.output = swapExt(outputDir, annotatedVcf.output, "manwe.annot.vcf.gz", ".varda_annotated.vcf.gz")
Sander Bollen's avatar
Sander Bollen committed
286
    finalLn.relative = true
Sander Bollen's avatar
Sander Bollen committed
287
    add(finalLn)
Sander Bollen's avatar
Sander Bollen committed
288 289 290 291

    finalLn.output
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
292 293
  def summaryFile = new File(outputDir, "Toucan.summary.json")

294
  def summaryFiles = Map("input_vcf" -> inputVcf, "outputVcf" -> outputVcf)
Peter van 't Hof's avatar
Peter van 't Hof committed
295 296

  def summarySettings = Map()
Sander Bollen's avatar
Sander Bollen committed
297 298 299
}

object Toucan extends PipelineCommand