WriteDependencies.scala 5.11 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.utils.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
20
21
import nl.lumc.sasc.biopet.utils.{ Logging, ConfigUtils }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
22
import scala.collection.mutable
Peter van 't Hof's avatar
Peter van 't Hof committed
23
import scala.collection.mutable.ListBuffer
24
25

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
26
27
28
29
 * This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
 *
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
30
31
object WriteDependencies extends Logging with Configurable {
  val root: Configurable = null
Peter van 't Hof's avatar
Peter van 't Hof committed
32
  private def createFunctionNames(functions: Seq[QFunction]): Map[QFunction, String] = {
33
    val cache: mutable.Map[String, Int] = mutable.Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
34
    (for (function <- functions) yield {
35
      val baseName = function match {
Sander Bollen's avatar
Sander Bollen committed
36
        case f: Configurable => f.configNamespace
Peter van 't Hof's avatar
Peter van 't Hof committed
37
        case f               => f.getClass.getSimpleName
38
39
      }
      cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
Peter van 't Hof's avatar
Peter van 't Hof committed
40
41
      function -> s"$baseName-${cache(baseName)}"
    }).toMap
42
43
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
44
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
45
46
47
48
49
   * This method will generate a json file where information about job and file dependencies are stored
   *
   * @param functions This should be all functions that are given to the graph of Queue
   * @param outputFile Json file to write dependencies to
   */
50
  def writeDependencies(functions: Seq[QFunction], outputFile: File): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
51
    logger.info("Start calculating dependencies")
52
53
54

    val errorOnMissingInput: Boolean = config("error_on_missing_input", false)

Peter van 't Hof's avatar
Peter van 't Hof committed
55
    val functionNames = createFunctionNames(functions)
56

Peter van 't Hof's avatar
Peter van 't Hof committed
57
58
59
    case class QueueFile(file: File) {
      private val inputJobs: ListBuffer[QFunction] = ListBuffer()
      def addInputJob(function: QFunction) = inputJobs += function
60
61
      def inputJobNames = inputJobs.toList.map(functionNames)

Peter van 't Hof's avatar
Peter van 't Hof committed
62
63
64
65
66
      private val outputJobs: ListBuffer[QFunction] = ListBuffer()
      def addOutputJob(function: QFunction) = {
        if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
        outputJobs += function
      }
67
      def outputJobNames = outputJobs.toList.map(functionNames)
Peter van 't Hof's avatar
Peter van 't Hof committed
68

69
70
71
72
73
74
75
76
77
78
79
      def getMap = {
        val fileExist = file.exists()
        if (!fileExist && outputJobs.isEmpty) {
          if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
          else logger.warn(s"Input file does not exist: $file")
        }
        Map(
          "path" -> file.getAbsolutePath,
          "intermediate" -> isIntermediate,
          "output_jobs" -> outputJobNames,
          "input_jobs" -> inputJobNames,
80
          "exists_at_start" -> fileExist,
81
82
83
          "pipeline_input" -> outputJobs.isEmpty
        )
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
86
87
88
89

      def isIntermediate = outputJobs.exists(_.isIntermediate)
    }

    val files: mutable.Map[File, QueueFile] = mutable.Map()

90
91
92
93
94
    def outputFiles(function: QFunction) = {
      if (function.jobErrorFile == null) function.outputs :+ function.jobOutputFile
      else function.outputs :+ function.jobOutputFile :+ function.jobErrorFile
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
95
96
97
98
99
100
    for (function <- functions) {
      for (input <- function.inputs) {
        val file = files.getOrElse(input, QueueFile(input))
        file.addInputJob(function)
        files += input -> file
      }
101
      for (output <- outputFiles(function)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
102
103
104
105
106
107
        val file = files.getOrElse(output, QueueFile(output))
        file.addOutputJob(function)
        files += output -> file
      }
    }

108
    val jobs = functionNames.par.map {
Peter van 't Hof's avatar
Peter van 't Hof committed
109
110
111
112
113
      case (f, name) =>
        name -> Map("command" -> (f match {
          case cmd: CommandLineFunction => cmd.commandLine
          case _                        => None
        }), "intermediate" -> f.isIntermediate,
114
115
116
          "depends_on_intermediate" -> f.inputs.exists(files(_).isIntermediate),
          "depends_on_jobs" -> f.inputs.toList.flatMap(files(_).outputJobNames).distinct,
          "output_used_by_jobs" -> outputFiles(f).toList.flatMap(files(_).inputJobNames).distinct,
117
118
119
120
121
          "outputs" -> outputFiles(f).toList,
          "inputs" -> f.inputs.toList,
          "done_at_start" -> f.isDone,
          "fail_at_start" -> f.isFail)
    }.toIterator.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
122

Peter van 't Hof's avatar
Peter van 't Hof committed
123
    logger.info(s"Writing dependencies to: $outputFile")
Peter van 't Hof's avatar
Peter van 't Hof committed
124
125
126
    val writer = new PrintWriter(outputFile)
    writer.println(ConfigUtils.mapToJson(Map(
      "jobs" -> jobs.toMap,
Peter van 't Hof's avatar
Peter van 't Hof committed
127
      "files" -> files.values.par.map(_.getMap).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
128
    )).spaces2)
129
130
    writer.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
131
132
    logger.info("done calculating dependencies")
  }
133
}