WriteDependencies.scala 4.47 KB
Newer Older
1
2
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import java.io.{ File, PrintWriter }
4

Peter van 't Hof's avatar
Peter van 't Hof committed
5
import nl.lumc.sasc.biopet.utils.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
6
7
import nl.lumc.sasc.biopet.utils.{ Logging, ConfigUtils }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
8
import scala.collection.mutable
Peter van 't Hof's avatar
Peter van 't Hof committed
9
import scala.collection.mutable.ListBuffer
10
11

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
15
 * This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
 *
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
16
17
18
object WriteDependencies extends Logging with Configurable {
  val root: Configurable = null
  private val functionNames: mutable.Map[QFunction, String] = mutable.Map()
19

Peter van 't Hof's avatar
Peter van 't Hof committed
20
  private def createFunctionNames(functions: Seq[QFunction]): Unit = {
21
22
23
    val cache: mutable.Map[String, Int] = mutable.Map()
    for (function <- functions) {
      val baseName = function match {
Peter van 't Hof's avatar
Peter van 't Hof committed
24
25
        case f: Configurable => f.configName
        case f               => f.getClass.getSimpleName
26
27
      }
      cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
Peter van 't Hof's avatar
Peter van 't Hof committed
28
      functionNames += function -> s"$baseName-${cache(baseName)}"
29
30
31
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
32
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
33
34
35
36
37
   * This method will generate a json file where information about job and file dependencies are stored
   *
   * @param functions This should be all functions that are given to the graph of Queue
   * @param outputFile Json file to write dependencies to
   */
38
  def writeDependencies(functions: Seq[QFunction], outputFile: File): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
39
    logger.info("Start calculating dependencies")
40
41
42

    val errorOnMissingInput: Boolean = config("error_on_missing_input", false)

43
44
    createFunctionNames(functions)

Peter van 't Hof's avatar
Peter van 't Hof committed
45
46
47
    case class QueueFile(file: File) {
      private val inputJobs: ListBuffer[QFunction] = ListBuffer()
      def addInputJob(function: QFunction) = inputJobs += function
48
49
      def inputJobNames = inputJobs.toList.map(functionNames)

Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
52
53
54
      private val outputJobs: ListBuffer[QFunction] = ListBuffer()
      def addOutputJob(function: QFunction) = {
        if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
        outputJobs += function
      }
55
      def outputJobNames = outputJobs.toList.map(functionNames)
Peter van 't Hof's avatar
Peter van 't Hof committed
56

57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
      def getMap = {
        val fileExist = file.exists()
        if (!fileExist && outputJobs.isEmpty) {
          if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
          else logger.warn(s"Input file does not exist: $file")
        }
        Map(
          "path" -> file.getAbsolutePath,
          "intermediate" -> isIntermediate,
          "output_jobs" -> outputJobNames,
          "input_jobs" -> inputJobNames,
          "exist_at_start" -> fileExist,
          "pipeline_input" -> outputJobs.isEmpty
        )
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
72
73
74
75
76
77

      def isIntermediate = outputJobs.exists(_.isIntermediate)
    }

    val files: mutable.Map[File, QueueFile] = mutable.Map()

78
79
80
81
82
    def outputFiles(function: QFunction) = {
      if (function.jobErrorFile == null) function.outputs :+ function.jobOutputFile
      else function.outputs :+ function.jobOutputFile :+ function.jobErrorFile
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
83
84
85
86
87
88
    for (function <- functions) {
      for (input <- function.inputs) {
        val file = files.getOrElse(input, QueueFile(input))
        file.addInputJob(function)
        files += input -> file
      }
89
      for (output <- outputFiles(function)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
90
91
92
93
94
95
        val file = files.getOrElse(output, QueueFile(output))
        file.addOutputJob(function)
        files += output -> file
      }
    }

96
    val jobs = functionNames.par.map {
Peter van 't Hof's avatar
Peter van 't Hof committed
97
98
99
100
101
102
      case (f, name) =>
        name -> Map("command" -> (f match {
          case cmd: CommandLineFunction => cmd.commandLine
          case _                        => None
        }), "intermediate" -> f.isIntermediate,
          "depens_on_intermediate" -> f.inputs.exists(files(_).isIntermediate),
103
104
105
106
107
108
109
          "depens_on_jobs" -> f.inputs.toList.flatMap(files(_).outputJobNames).distinct,
          "ouput_used_by_jobs" -> outputFiles(f).toList.flatMap(files(_).inputJobNames).distinct,
          "outputs" -> outputFiles(f).toList,
          "inputs" -> f.inputs.toList,
          "done_at_start" -> f.isDone,
          "fail_at_start" -> f.isFail)
    }.toIterator.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
110

Peter van 't Hof's avatar
Peter van 't Hof committed
111
    logger.info(s"Writing dependencies to: $outputFile")
Peter van 't Hof's avatar
Peter van 't Hof committed
112
113
114
    val writer = new PrintWriter(outputFile)
    writer.println(ConfigUtils.mapToJson(Map(
      "jobs" -> jobs.toMap,
Peter van 't Hof's avatar
Peter van 't Hof committed
115
      "files" -> files.values.par.map(_.getMap).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
116
    )).spaces2)
117
118
    writer.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
119
120
    logger.info("done calculating dependencies")
  }
121
}