WriteDependencies.scala 6.01 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

19
import nl.lumc.sasc.biopet.core.summary.WriteSummary
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
21
22
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, Logging }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
23

24
import scala.collection.mutable
Peter van 't Hof's avatar
Peter van 't Hof committed
25
import scala.collection.mutable.ListBuffer
26
27

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
28
29
30
31
 * This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
 *
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
32
object WriteDependencies extends Logging with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
33
  val parent: Configurable = null
Peter van 't Hof's avatar
Peter van 't Hof committed
34
  private def createFunctionNames(functions: Seq[QFunction]): Map[QFunction, String] = {
35
    val cache: mutable.Map[String, Int] = mutable.Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
36
    (for (function <- functions) yield {
37
      val baseName = function match {
Sander Bollen's avatar
Sander Bollen committed
38
        case f: Configurable => f.configNamespace
Peter van 't Hof's avatar
Peter van 't Hof committed
39
        case f               => f.getClass.getSimpleName
40
41
      }
      cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
42
      function -> s"${baseName.replaceAll("-", "_")}_${cache(baseName)}"
Peter van 't Hof's avatar
Peter van 't Hof committed
43
    }).toMap
44
45
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
46
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
47
48
49
   * This method will generate a json file where information about job and file dependencies are stored
   *
   * @param functions This should be all functions that are given to the graph of Queue
Peter van 't Hof's avatar
Peter van 't Hof committed
50
   * @param outputDir
Peter van 't Hof's avatar
Peter van 't Hof committed
51
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
52
  def writeDependencies(functions: Seq[QFunction], outputDir: File): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
53
    outputDir.mkdirs()
Peter van 't Hof's avatar
Peter van 't Hof committed
54
    logger.info("Start calculating dependencies")
55
56
57

    val errorOnMissingInput: Boolean = config("error_on_missing_input", false)

Peter van 't Hof's avatar
Peter van 't Hof committed
58
    val functionNames = createFunctionNames(functions)
59

Peter van 't Hof's avatar
Peter van 't Hof committed
60
61
62
    case class QueueFile(file: File) {
      private val inputJobs: ListBuffer[QFunction] = ListBuffer()
      def addInputJob(function: QFunction) = inputJobs += function
63
64
      def inputJobNames = inputJobs.toList.map(functionNames)

Peter van 't Hof's avatar
Peter van 't Hof committed
65
66
67
68
69
      private val outputJobs: ListBuffer[QFunction] = ListBuffer()
      def addOutputJob(function: QFunction) = {
        if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
        outputJobs += function
      }
70
      def outputJobNames = outputJobs.toList.map(functionNames)
Peter van 't Hof's avatar
Peter van 't Hof committed
71

72
73
74
75
76
77
78
79
80
81
82
      def getMap = {
        val fileExist = file.exists()
        if (!fileExist && outputJobs.isEmpty) {
          if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
          else logger.warn(s"Input file does not exist: $file")
        }
        Map(
          "path" -> file.getAbsolutePath,
          "intermediate" -> isIntermediate,
          "output_jobs" -> outputJobNames,
          "input_jobs" -> inputJobNames,
83
          "exists_at_start" -> fileExist,
84
85
86
          "pipeline_input" -> outputJobs.isEmpty
        )
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
89
90
91
92
93

      def isIntermediate = outputJobs.exists(_.isIntermediate)
    }

    val files: mutable.Map[File, QueueFile] = mutable.Map()

    for (function <- functions) {
Peter van 't Hof's avatar
Peter van 't Hof committed
94
      for (input <- BiopetQScript.safeInputs(function).getOrElse(Seq())) {
Peter van 't Hof's avatar
Peter van 't Hof committed
95
96
97
98
        val file = files.getOrElse(input, QueueFile(input))
        file.addInputJob(function)
        files += input -> file
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
99
      for (output <- BiopetQScript.safeOutputs(function).getOrElse(Seq())) {
Peter van 't Hof's avatar
Peter van 't Hof committed
100
101
102
103
        val file = files.getOrElse(output, QueueFile(output))
        file.addOutputJob(function)
        files += output -> file
      }
104
105
106
      val file = files.getOrElse(function.jobOutputFile, QueueFile(function.jobOutputFile))
      file.addOutputJob(function)
      files += function.jobOutputFile -> file
Peter van 't Hof's avatar
Peter van 't Hof committed
107
108
    }

109
    val jobs = functionNames.par.map {
Peter van 't Hof's avatar
Peter van 't Hof committed
110
      case (f, name) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
111
        name.toString -> Map("command" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
112
113
          case cmd: CommandLineFunction => cmd.commandLine
          case _                        => None
114
        }), "main_job" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
115
          case cmd: BiopetCommandLineFunction            => cmd.mainFunction
Peter van 't Hof's avatar
Peter van 't Hof committed
116
          case s: WriteSummary if s.qscript.parent == null => true
Peter van 't Hof's avatar
Peter van 't Hof committed
117
          case _                                         => false
Peter van 't Hof's avatar
Peter van 't Hof committed
118
        }), "intermediate" -> f.isIntermediate,
Peter van 't Hof's avatar
Peter van 't Hof committed
119
          "depends_on_intermediate" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).exists(files(_).isIntermediate),
Peter van 't Hof's avatar
Peter van 't Hof committed
120
          "depends_on_jobs" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).toList.flatMap(files(_).outputJobNames).distinct,
Peter van 't Hof's avatar
Peter van 't Hof committed
121
          "output_used_by_jobs" -> BiopetQScript.safeOutputs(f).getOrElse(Seq()).toList.flatMap(files(_).inputJobNames).distinct,
122
          "outputs" -> (f.jobOutputFile :: BiopetQScript.safeOutputs(f).getOrElse(Seq()).toList),
Peter van 't Hof's avatar
Peter van 't Hof committed
123
          "inputs" -> BiopetQScript.safeInputs(f).getOrElse(Seq()).toList,
Peter van 't Hof's avatar
Peter van 't Hof committed
124
125
          "done_files" -> BiopetQScript.safeDoneFiles(f).getOrElse(Seq()).toList,
          "fail_files" -> BiopetQScript.safeFailFiles(f).getOrElse(Seq()).toList,
Peter van 't Hof's avatar
Peter van 't Hof committed
126
          "stdout_file" -> f.jobOutputFile,
Peter van 't Hof's avatar
Peter van 't Hof committed
127
128
          "done_at_start" -> BiopetQScript.safeIsDone(f),
          "fail_at_start" -> BiopetQScript.safeIsFail(f))
129
    }.toIterator.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
130

Peter van 't Hof's avatar
Peter van 't Hof committed
131
    val outputFile = new File(outputDir, s"deps.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
132
    logger.info(s"Writing dependencies to: $outputFile")
Peter van 't Hof's avatar
Peter van 't Hof committed
133
134
    val writer = new PrintWriter(outputFile)
    writer.println(ConfigUtils.mapToJson(Map(
Peter van 't Hof's avatar
Peter van 't Hof committed
135
      "jobs" -> jobs,
Peter van 't Hof's avatar
Peter van 't Hof committed
136
      "files" -> files.values.par.map(_.getMap).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
137
    )).spaces2)
138
139
    writer.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
140
    PipelineStatus.writePipelineStatus(PipelineStatus.readDepsFile(outputFile), outputDir)
Peter van 't Hof's avatar
Peter van 't Hof committed
141
142
    logger.info("done calculating dependencies")
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
143

144
}