CommandLineResources.scala 3.82 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
19
20
21
package nl.lumc.sasc.biopet.core

import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.queue.function.CommandLineFunction

/**
22
 * This trait will control resources given to a CommandlineFunction
23
24
25
26
27
28
 */
trait CommandLineResources extends CommandLineFunction with Configurable {

  def defaultThreads = 1
  final def threads = nCoresRequest match {
    case Some(i) => i
29
    case _ =>
30
31
32
33
34
35
      val t = getThreads
      nCoresRequest = Some(t)
      t
  }

  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
36
  def defaultCoreMemory: Double = 2.0
37
38
39
40
41
42
  def defaultVmemFactor: Double = 1.4
  var vmemFactor: Double = config("vmem_factor", default = defaultVmemFactor)

  var residentFactor: Double = config("resident_factor", default = 1.2)

  private var _coreMemory: Double = 2.0
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
43
  def coreMemory = _coreMemory
44
45
46
47
48

  var retry = 0

  override def freezeFieldValues(): Unit = {
    setResources()
Peter van 't Hof's avatar
Peter van 't Hof committed
49
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
50
51
52
53
54
55
56
57
58
59
60
    super.freezeFieldValues()
  }

  def getThreads: Int = getThreads(defaultThreads)

  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
  private def getThreads(default: Int): Int = {
Peter van 't Hof's avatar
Peter van 't Hof committed
61
    val maxThreads: Option[Int] = config("maxthreads")
62
    val threads: Int = config("threads", default = default)
Peter van 't Hof's avatar
Peter van 't Hof committed
63
64
65
66
    maxThreads match {
      case Some(max) => if (max > threads) threads else max
      case _         => threads
    }
67
68
69
70
71
72
73
74
75
76
77
  }

  def setResources(): Unit = {
    val firstOutput = try {
      this.firstOutput
    } catch {
      case e: NullPointerException => null
    }

    nCoresRequest = Option(threads)

78
79
80
    /** The 1e retry does not yet upgrade the memory */
    val retryMultipler = if (retry > 1) retry - 1 else 0

81
    _coreMemory = config("core_memory", default = defaultCoreMemory).asDouble +
82
      (0.5 * retryMultipler)
83
84
85
86
87

    if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
    else memoryLimit = Some(_coreMemory * threads)

    if (config.contains("resident_limit")) residentLimit = config("resident_limit")
88
    else residentLimit = Some((_coreMemory + (0.5 * retryMultipler)) * residentFactor)
89

90
    if (!config.contains("vmem")) vmem = Some((_coreMemory * (vmemFactor + (0.5 * retryMultipler))) + "G")
Sander Bollen's avatar
Sander Bollen committed
91
    jobName = configNamespace + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
92
93
94
95
96
  }

  override def setupRetry(): Unit = {
    super.setupRetry()
    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filterNot(_.contains("h_vmem="))
97
    if (retry > 0) logger.info("Auto raise memory on retry")
98
    retry += 1
Peter van 't Hof's avatar
Peter van 't Hof committed
99
    this.freezeFieldValues()
100
101
  }

102
103
  var threadsCorrection = 0

104
105
  protected def combineResources(commands: List[CommandLineResources]): Unit = {
    commands.foreach(_.setResources())
106
    nCoresRequest = Some(commands.map(_.threads).sum + threadsCorrection)
107

Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
108
    _coreMemory = commands.map(cmd => cmd.coreMemory * (cmd.threads.toDouble / threads.toDouble)).sum
109
110
111
112
113
114
    memoryLimit = Some(_coreMemory * threads)
    residentLimit = Some((_coreMemory + (0.5 * retry)) * residentFactor)
    vmem = Some((_coreMemory * (vmemFactor + (0.5 * retry))) + "G")
  }

}