WriteDependencies.scala 5.67 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

19
import nl.lumc.sasc.biopet.core.summary.WriteSummary
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
21
22
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, Logging }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
23

24
import scala.collection.mutable
Peter van 't Hof's avatar
Peter van 't Hof committed
25
import scala.collection.mutable.ListBuffer
26
27

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
28
29
30
31
 * This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
 *
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
32
33
object WriteDependencies extends Logging with Configurable {
  val root: Configurable = null
Peter van 't Hof's avatar
Peter van 't Hof committed
34
  private def createFunctionNames(functions: Seq[QFunction]): Map[QFunction, String] = {
35
    val cache: mutable.Map[String, Int] = mutable.Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
36
    (for (function <- functions) yield {
37
      val baseName = function match {
Sander Bollen's avatar
Sander Bollen committed
38
        case f: Configurable => f.configNamespace
Peter van 't Hof's avatar
Peter van 't Hof committed
39
        case f               => f.getClass.getSimpleName
40
41
      }
      cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
42
      function -> s"${baseName.replaceAll("-", "_")}_${cache(baseName)}"
Peter van 't Hof's avatar
Peter van 't Hof committed
43
    }).toMap
44
45
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
46
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
47
48
49
   * This method will generate a json file where information about job and file dependencies are stored
   *
   * @param functions This should be all functions that are given to the graph of Queue
Peter van 't Hof's avatar
Peter van 't Hof committed
50
   * @param outputDir
Peter van 't Hof's avatar
Peter van 't Hof committed
51
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
52
  def writeDependencies(functions: Seq[QFunction], outputDir: File): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
53
    outputDir.mkdirs()
Peter van 't Hof's avatar
Peter van 't Hof committed
54
    logger.info("Start calculating dependencies")
55
56
57

    val errorOnMissingInput: Boolean = config("error_on_missing_input", false)

Peter van 't Hof's avatar
Peter van 't Hof committed
58
    val functionNames = createFunctionNames(functions)
59

Peter van 't Hof's avatar
Peter van 't Hof committed
60
61
62
    case class QueueFile(file: File) {
      private val inputJobs: ListBuffer[QFunction] = ListBuffer()
      def addInputJob(function: QFunction) = inputJobs += function
63
64
      def inputJobNames = inputJobs.toList.map(functionNames)

Peter van 't Hof's avatar
Peter van 't Hof committed
65
66
67
68
69
      private val outputJobs: ListBuffer[QFunction] = ListBuffer()
      def addOutputJob(function: QFunction) = {
        if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
        outputJobs += function
      }
70
      def outputJobNames = outputJobs.toList.map(functionNames)
Peter van 't Hof's avatar
Peter van 't Hof committed
71

72
73
74
75
76
77
78
79
80
81
82
      def getMap = {
        val fileExist = file.exists()
        if (!fileExist && outputJobs.isEmpty) {
          if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
          else logger.warn(s"Input file does not exist: $file")
        }
        Map(
          "path" -> file.getAbsolutePath,
          "intermediate" -> isIntermediate,
          "output_jobs" -> outputJobNames,
          "input_jobs" -> inputJobNames,
83
          "exists_at_start" -> fileExist,
84
85
86
          "pipeline_input" -> outputJobs.isEmpty
        )
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
89
90
91
92

      def isIntermediate = outputJobs.exists(_.isIntermediate)
    }

    val files: mutable.Map[File, QueueFile] = mutable.Map()

93
94
95
96
97
    def outputFiles(function: QFunction) = {
      if (function.jobErrorFile == null) function.outputs :+ function.jobOutputFile
      else function.outputs :+ function.jobOutputFile :+ function.jobErrorFile
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
98
99
100
101
102
103
    for (function <- functions) {
      for (input <- function.inputs) {
        val file = files.getOrElse(input, QueueFile(input))
        file.addInputJob(function)
        files += input -> file
      }
104
      for (output <- outputFiles(function)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
105
106
107
108
109
110
        val file = files.getOrElse(output, QueueFile(output))
        file.addOutputJob(function)
        files += output -> file
      }
    }

111
    val jobs = functionNames.par.map {
Peter van 't Hof's avatar
Peter van 't Hof committed
112
      case (f, name) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
113
        name.toString -> Map("command" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
114
115
          case cmd: CommandLineFunction => cmd.commandLine
          case _                        => None
116
        }), "main_job" -> (f match {
Peter van 't Hof's avatar
Peter van 't Hof committed
117
          case cmd: BiopetCommandLineFunction            => cmd.mainFunction
118
          case s: WriteSummary if s.qscript.root == null => true
Peter van 't Hof's avatar
Peter van 't Hof committed
119
          case _                                         => false
Peter van 't Hof's avatar
Peter van 't Hof committed
120
        }), "intermediate" -> f.isIntermediate,
121
122
123
          "depends_on_intermediate" -> f.inputs.exists(files(_).isIntermediate),
          "depends_on_jobs" -> f.inputs.toList.flatMap(files(_).outputJobNames).distinct,
          "output_used_by_jobs" -> outputFiles(f).toList.flatMap(files(_).inputJobNames).distinct,
124
125
          "outputs" -> outputFiles(f).toList,
          "inputs" -> f.inputs.toList,
Peter van 't Hof's avatar
Peter van 't Hof committed
126
127
128
          "done_files" -> f.doneOutputs.toList,
          "fail_files" -> f.failOutputs.toList,
          "stdout_file" -> f.jobOutputFile,
129
130
131
          "done_at_start" -> f.isDone,
          "fail_at_start" -> f.isFail)
    }.toIterator.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
132

Peter van 't Hof's avatar
Peter van 't Hof committed
133
    val outputFile = new File(outputDir, s"deps.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
134
    logger.info(s"Writing dependencies to: $outputFile")
Peter van 't Hof's avatar
Peter van 't Hof committed
135
136
    val writer = new PrintWriter(outputFile)
    writer.println(ConfigUtils.mapToJson(Map(
Peter van 't Hof's avatar
Peter van 't Hof committed
137
      "jobs" -> jobs,
Peter van 't Hof's avatar
Peter van 't Hof committed
138
      "files" -> files.values.par.map(_.getMap).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
139
    )).spaces2)
140
141
    writer.close()

142
    PipelineStatus.writePipelineStatus(outputFile, outputDir)
Peter van 't Hof's avatar
Peter van 't Hof committed
143
144
    logger.info("done calculating dependencies")
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
145

146
}