WriteDependencies.scala 6.01 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

19
import nl.lumc.sasc.biopet.core.summary.WriteSummary
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
21 22
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, Logging }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
23

24
import scala.collection.mutable
Peter van 't Hof's avatar
Peter van 't Hof committed
25
import scala.collection.mutable.ListBuffer
26 27

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
28 29 30 31
 * This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
 *
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
32 33
object WriteDependencies extends Logging with Configurable {
  val root: Configurable = null
Peter van 't Hof's avatar
Peter van 't Hof committed
34
  private def createFunctionNames(functions: Seq[QFunction]): Map[QFunction, String] = {
35
    val cache: mutable.Map[String, Int] = mutable.Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
36
    (for (function <- functions) yield {
37
      val baseName = function match {
Sander Bollen's avatar
Sander Bollen committed
38
        case f: Configurable => f.configNamespace
Peter van 't Hof's avatar
Peter van 't Hof committed
39
        case f               => f.getClass.getSimpleName
40 41
      }
      cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
42
      function -> s"${baseName.replaceAll("-", "_")}_${cache(baseName)}"
Peter van 't Hof's avatar
Peter van 't Hof committed
43
    }).toMap
44 45
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
46
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
47 48 49
   * This method will generate a json file where information about job and file dependencies are stored
   *
   * @param functions This should be all functions that are given to the graph of Queue
Peter van 't Hof's avatar
Peter van 't Hof committed
50
   * @param outputDir
Peter van 't Hof's avatar
Peter van 't Hof committed
51
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
52
  def writeDependencies(functions: Seq[QFunction], outputDir: File): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
53
    outputDir.mkdirs()
Peter van 't Hof's avatar
Peter van 't Hof committed
54
    logger.info("Start calculating dependencies")
55 56 57

    val errorOnMissingInput: Boolean = config("error_on_missing_input", false)

Peter van 't Hof's avatar
Peter van 't Hof committed
58
    val functionNames = createFunctionNames(functions)
59

Peter van 't Hof's avatar
Peter van 't Hof committed
60 61 62
    case class QueueFile(file: File) {
      private val inputJobs: ListBuffer[QFunction] = ListBuffer()
      def addInputJob(function: QFunction) = inputJobs += function
63 64
      def inputJobNames = inputJobs.toList.map(functionNames)

Peter van 't Hof's avatar
Peter van 't Hof committed
65 66 67 68 69
      private val outputJobs: ListBuffer[QFunction] = ListBuffer()
      def addOutputJob(function: QFunction) = {
        if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
        outputJobs += function
      }
70
      def outputJobNames = outputJobs.toList.map(functionNames)
Peter van 't Hof's avatar
Peter van 't Hof committed
71

72 73 74 75 76 77 78 79 80 81 82
      def getMap = {
        val fileExist = file.exists()
        if (!fileExist && outputJobs.isEmpty) {
          if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
          else logger.warn(s"Input file does not exist: $file")
        }
        Map(
          "path" -> file.getAbsolutePath,
          "intermediate" -> isIntermediate,
          "output_jobs" -> outputJobNames,
          "input_jobs" -> inputJobNames,
83
          "exists_at_start" -> fileExist,
84 85 86
          "pipeline_input" -> outputJobs.isEmpty
        )
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
87 88 89 90 91 92 93

      def isIntermediate = outputJobs.exists(_.isIntermediate)
    }

    val files: mutable.Map[File, QueueFile] = mutable.Map()

    for (function <- functions) {
Peter van 't Hof's avatar
Peter van 't Hof committed
94
      for (input <- BiopetQScript.safeInputs(function).getOrElse(Seq())) {
Peter van 't Hof's avatar
Peter van 't Hof committed
95 96 97 98
        val file = files.getOrElse(input, QueueFile(input))
        file.addInputJob(function)
        files += input -> file
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
99
      for (output <- BiopetQScript.safeOutputs(function).getOrElse(Seq())) {
Peter van 't Hof's avatar
Peter van 't Hof committed
100 101 102 103
        val file = files.getOrElse(output, QueueFile(output))
        file.addOutputJob(function)
        files += output -> file
      }
104 105 106
      val file = files.getOrElse(function.jobOutputFile, QueueFile(function.jobOutputFile))
      file.addOutputJob(function)
      files += function.jobOutputFile -> file
Peter van 't Hof's avatar
Peter van 't Hof committed
107 108
    }

109
    val jobs = functionNames.par.map {
Peter van 't Hof's avatar
Peter van 't Hof committed
110
      case (f, name) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
111
        name.toString -> Map("command" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
112 113
          case cmd: CommandLineFunction => cmd.commandLine
          case _                        => None
114
        }), "main_job" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
115
          case cmd: BiopetCommandLineFunction            => cmd.mainFunction
116
          case s: WriteSummary if s.qscript.root == null => true
Peter van 't Hof's avatar
Peter van 't Hof committed
117
          case _                                         => false
Peter van 't Hof's avatar
Peter van 't Hof committed
118
        }), "intermediate" -> f.isIntermediate,
Peter van 't Hof's avatar
Peter van 't Hof committed
119
          "depends_on_intermediate" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).exists(files(_).isIntermediate),
Peter van 't Hof's avatar
Peter van 't Hof committed
120
          "depends_on_jobs" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).toList.flatMap(files(_).outputJobNames).distinct,
Peter van 't Hof's avatar
Peter van 't Hof committed
121
          "output_used_by_jobs" -> BiopetQScript.safeOutputs(f).getOrElse(Seq()).toList.flatMap(files(_).inputJobNames).distinct,
122
          "outputs" -> (f.jobOutputFile :: BiopetQScript.safeOutputs(f).getOrElse(Seq()).toList),
Peter van 't Hof's avatar
Peter van 't Hof committed
123
          "inputs" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).toList,
Peter van 't Hof's avatar
Peter van 't Hof committed
124 125
          "done_files" -> BiopetQScript.safeDoneFiles(f).getOrElse(Seq()).toList,
          "fail_files" -> BiopetQScript.safeFailFiles(f).getOrElse(Seq()).toList,
Peter van 't Hof's avatar
Peter van 't Hof committed
126
          "stdout_file" -> f.jobOutputFile,
Peter van 't Hof's avatar
Peter van 't Hof committed
127 128
          "done_at_start" -> BiopetQScript.safeIsDone(f),
          "fail_at_start" -> BiopetQScript.safeIsFail(f))
129
    }.toIterator.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
130

Peter van 't Hof's avatar
Peter van 't Hof committed
131
    val outputFile = new File(outputDir, s"deps.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
132
    logger.info(s"Writing dependencies to: $outputFile")
Peter van 't Hof's avatar
Peter van 't Hof committed
133 134
    val writer = new PrintWriter(outputFile)
    writer.println(ConfigUtils.mapToJson(Map(
Peter van 't Hof's avatar
Peter van 't Hof committed
135
      "jobs" -> jobs,
Peter van 't Hof's avatar
Peter van 't Hof committed
136
      "files" -> files.values.par.map(_.getMap).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
137
    )).spaces2)
138 139
    writer.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
140
    PipelineStatus.writePipelineStatus(PipelineStatus.readDepsFile(outputFile), outputDir)
Peter van 't Hof's avatar
Peter van 't Hof committed
141 142
    logger.info("done calculating dependencies")
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
143

144
}