CommandLineResources.scala 4.04 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
17
18
19
20
package nl.lumc.sasc.biopet.core

import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.queue.function.CommandLineFunction

/**
21
 * This trait will control resources given to a CommandlineFunction
22
23
24
25
26
27
 */
trait CommandLineResources extends CommandLineFunction with Configurable {

  def defaultThreads = 1
  final def threads = nCoresRequest match {
    case Some(i) => i
28
    case _ =>
29
30
31
32
33
      val t = getThreads
      nCoresRequest = Some(t)
      t
  }

34
35
36
  val multiplyVmemThreads: Boolean = config("multiply_vmem_threads", default = true)
  val multiplyRssThreads: Boolean = config("multiply_rss_threads", default = true)

37
  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
38
  def defaultCoreMemory: Double = 2.0
39
40
41
42
43
44
  def defaultVmemFactor: Double = 1.4
  var vmemFactor: Double = config("vmem_factor", default = defaultVmemFactor)

  var residentFactor: Double = config("resident_factor", default = 1.2)

  private var _coreMemory: Double = 2.0
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
45
  def coreMemory = _coreMemory
46
47
48
49
50

  var retry = 0

  override def freezeFieldValues(): Unit = {
    setResources()
Peter van 't Hof's avatar
Peter van 't Hof committed
51
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
52
53
54
55
56
57
58
59
60
61
62
    super.freezeFieldValues()
  }

  def getThreads: Int = getThreads(defaultThreads)

  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
  private def getThreads(default: Int): Int = {
Peter van 't Hof's avatar
Peter van 't Hof committed
63
    val maxThreads: Option[Int] = config("maxthreads")
64
    val threads: Int = config("threads", default = default)
Peter van 't Hof's avatar
Peter van 't Hof committed
65
66
67
68
    maxThreads match {
      case Some(max) => if (max > threads) threads else max
      case _         => threads
    }
69
70
71
72
73
74
75
76
77
78
79
  }

  def setResources(): Unit = {
    val firstOutput = try {
      this.firstOutput
    } catch {
      case e: NullPointerException => null
    }

    nCoresRequest = Option(threads)

80
81
82
    /** The 1e retry does not yet upgrade the memory */
    val retryMultipler = if (retry > 1) retry - 1 else 0

83
    _coreMemory = config("core_memory", default = defaultCoreMemory).asDouble +
84
      (0.5 * retryMultipler)
85
86
87
88
89

    if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
    else memoryLimit = Some(_coreMemory * threads)

    if (config.contains("resident_limit")) residentLimit = config("resident_limit")
Peter van 't Hof's avatar
Peter van 't Hof committed
90
    else residentLimit = Some((_coreMemory + (0.5 * retryMultipler)) * residentFactor * (if (multiplyRssThreads) threads else 1))
91

92
    if (!config.contains("vmem"))
Peter van 't Hof's avatar
Peter van 't Hof committed
93
      vmem = Some((_coreMemory * (vmemFactor + (0.5 * retryMultipler)) * (if (multiplyVmemThreads) threads else 1)) + "G")
Sander Bollen's avatar
Sander Bollen committed
94
    jobName = configNamespace + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
95
96
97
98
99
  }

  override def setupRetry(): Unit = {
    super.setupRetry()
    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filterNot(_.contains("h_vmem="))
100
    if (retry > 0) logger.info("Auto raise memory on retry")
101
    retry += 1
Peter van 't Hof's avatar
Peter van 't Hof committed
102
    this.freezeFieldValues()
103
104
  }

105
106
  var threadsCorrection = 0

107
108
  protected def combineResources(commands: List[CommandLineResources]): Unit = {
    commands.foreach(_.setResources())
109
    nCoresRequest = Some(commands.map(_.threads).sum + threadsCorrection)
110

Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
111
    _coreMemory = commands.map(cmd => cmd.coreMemory * (cmd.threads.toDouble / threads.toDouble)).sum
112
113
114
115
116
117
    memoryLimit = Some(_coreMemory * threads)
    residentLimit = Some((_coreMemory + (0.5 * retry)) * residentFactor)
    vmem = Some((_coreMemory * (vmemFactor + (0.5 * retry))) + "G")
  }

}