CommandLineResources.scala 5.02 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16 17 18 19 20
package nl.lumc.sasc.biopet.core

import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.queue.function.CommandLineFunction

/**
21
 * This trait will control resources given to a CommandlineFunction
22 23 24
 */
trait CommandLineResources extends CommandLineFunction with Configurable {

Peter van 't Hof's avatar
Peter van 't Hof committed
25 26 27 28
  /**
   * This value is overruling threads method when this is set.
   * This can be used to limit the number of threads on a global level.
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
29 30
  lazy val maxThreads: Option[Int] = config("max_threads")

Peter van 't Hof's avatar
Peter van 't Hof committed
31
  /** To set an other default threads this method need to be override */
32
  def defaultThreads = 1
Peter van 't Hof's avatar
Peter van 't Hof committed
33 34

  /** This method will get and set the cores requested */
35 36
  final def threads = nCoresRequest match {
    case Some(i) => i
37
    case _ =>
38 39 40 41 42
      val t = getThreads
      nCoresRequest = Some(t)
      t
  }

43 44 45
  val multiplyVmemThreads: Boolean = config("multiply_vmem_threads", default = true)
  val multiplyRssThreads: Boolean = config("multiply_rss_threads", default = true)

46
  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
47
  def defaultCoreMemory: Double = 2.0
48
  def defaultVmemFactor: Double = 1.4
49
  def defaultResidentFactor: Double = 1.2
50 51
  var vmemFactor: Double = config("vmem_factor", default = defaultVmemFactor)

Peter van 't Hof's avatar
Peter van 't Hof committed
52 53
  val useSge: Boolean = config("use_sge", default = true)

54
  var residentFactor: Double = config("resident_factor", default = defaultResidentFactor)
55 56

  private var _coreMemory: Double = 2.0
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
57
  def coreMemory = _coreMemory
58

Peter van 't Hof's avatar
Peter van 't Hof committed
59
  /** This value is for SGE and is defined in seconds */
Peter van 't Hof's avatar
Peter van 't Hof committed
60 61 62 63
  wallTime = config("max_walltime")

  /** This value is specific for slurm */
  qualityOfSerice = config("quality_of_serice")
Peter van 't Hof's avatar
Peter van 't Hof committed
64

65 66 67 68
  var retry = 0

  override def freezeFieldValues(): Unit = {
    setResources()
69 70 71 72
    if (useSge) {
      vmem.foreach(v => jobResourceRequests :+= s"h_vmem=$v")
      wallTime.foreach(t => jobResourceRequests :+= s"h_rt=$t")
    }
73 74 75 76 77 78 79 80 81 82 83 84
    super.freezeFieldValues()
  }

  def getThreads: Int = getThreads(defaultThreads)

  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
  private def getThreads(default: Int): Int = {
    val threads: Int = config("threads", default = default)
Peter van 't Hof's avatar
Peter van 't Hof committed
85 86 87 88
    maxThreads match {
      case Some(max) => if (max > threads) threads else max
      case _         => threads
    }
89 90 91 92 93 94 95 96 97 98 99
  }

  def setResources(): Unit = {
    val firstOutput = try {
      this.firstOutput
    } catch {
      case e: NullPointerException => null
    }

    nCoresRequest = Option(threads)

100 101 102
    /** The 1e retry does not yet upgrade the memory */
    val retryMultipler = if (retry > 1) retry - 1 else 0

103
    _coreMemory = config("core_memory", default = defaultCoreMemory).asDouble +
104
      (0.5 * retryMultipler)
105 106 107 108 109

    if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
    else memoryLimit = Some(_coreMemory * threads)

    if (config.contains("resident_limit")) residentLimit = config("resident_limit")
Peter van 't Hof's avatar
Peter van 't Hof committed
110
    else residentLimit = Some((_coreMemory + (0.5 * retryMultipler)) * residentFactor * (if (multiplyRssThreads) threads else 1))
111

112
    if (!config.contains("vmem"))
Peter van 't Hof's avatar
Peter van 't Hof committed
113
      vmem = Some((_coreMemory * (vmemFactor + (0.5 * retryMultipler)) * (if (multiplyVmemThreads) threads else 1)) + "G")
Sander Bollen's avatar
Sander Bollen committed
114
    jobName = configNamespace + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
115 116 117 118 119
  }

  override def setupRetry(): Unit = {
    super.setupRetry()
    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filterNot(_.contains("h_vmem="))
120
    if (retry > 0) logger.info("Auto raise memory on retry")
121
    retry += 1
122
    waitBeforeJob = waitBeforeJob.map(_ + (retry * 10))
Peter van 't Hof's avatar
Peter van 't Hof committed
123
    this.freezeFieldValues()
124 125
  }

126 127
  var threadsCorrection = 0

128 129
  protected def combineResources(commands: List[CommandLineResources]): Unit = {
    commands.foreach(_.setResources())
130
    nCoresRequest = Some(commands.map(_.threads).sum + threadsCorrection)
Peter van 't Hof's avatar
Typo  
Peter van 't Hof committed
131
    nCoresRequest = nCoresRequest.map(x => if (x > maxThreads.getOrElse(x)) maxThreads.getOrElse(x) else x)
132

Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
133
    _coreMemory = commands.map(cmd => cmd.coreMemory * (cmd.threads.toDouble / threads.toDouble)).sum
Peter van 't Hof's avatar
Peter van 't Hof committed
134 135 136
    memoryLimit = Some(_coreMemory * nCoresRequest.getOrElse(threads))
    residentLimit = Some((_coreMemory + (0.5 * retry)) * residentFactor * (if (multiplyRssThreads) nCoresRequest.getOrElse(threads) else 1))
    vmem = Some((_coreMemory * (vmemFactor + (0.5 * retry)) * (if (multiplyVmemThreads) nCoresRequest.getOrElse(threads) else 1)) + "G")
137 138 139
  }

}