queueJobReport.R 9.97 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
library(gsalib)
library(grid)
library(ggplot2)
library(gplots)
library(tools)
library(reshape)
library(plyr)

# Modified version, loading `grid` manually. Ggplot2 doesn't load this

#
# Standard command line switch.  Can we loaded interactively for development
# or executed with RScript
#
args = commandArgs(TRUE)
onCMDLine = ! is.na(args[1])
if ( onCMDLine ) {
  inputFileName = args[1]
  outputPDF = args[2]
} else {
  inputFileName = "~/workspaces/scratch/SingleSample_VQSRwNewSOR.jobreport.txt"
  #inputFileName = "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/Q-25718@node1149.jobreport.txt"
  #inputFileName = "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/rodPerformanceGoals/history/report.082711.txt"
  outputPDF = NA
}

RUNTIME_UNITS = "(hours)"
ORIGINAL_UNITS_TO_RUNTIME_UNITS = 1/1000/60/60

# 
# Helper function to aggregate all of the jobs in the report across all tables
#
allJobsFromReport <- function(report) {
34
  names <- c("jobName", "startTime", "analysisName", "doneTime", "exechosts", "runtime", "cores")
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  sub <- lapply(report, function(table) table[,names])
  do.call("rbind", sub)
}

#
# Creates segmentation plots of time (x) vs. job (y) with segments for the duration of the job
#
plotJobsGantt <- function(gatkReport, sortOverall, title, includeText) {
  allJobs = allJobsFromReport(gatkReport)
  if ( sortOverall ) {
    allJobs = allJobs[order(allJobs$analysisName, allJobs$startTime, decreasing=T), ]
  } else {
    allJobs = allJobs[order(allJobs$startTime, decreasing=T), ]
  }
  allJobs$index = 1:nrow(allJobs)
  minTime = min(allJobs$startTime)
  allJobs$relStartTime = allJobs$startTime - minTime 
  allJobs$relDoneTime = allJobs$doneTime - minTime
  allJobs$ganttName = paste(allJobs$jobName, "@", allJobs$exechosts)
  maxRelTime = max(allJobs$relDoneTime)
  p <- ggplot(data=allJobs, aes(x=relStartTime, y=index, color=analysisName))
  p <- p + theme_bw()
  p <- p + geom_segment(aes(xend=relDoneTime, yend=index), size=1, arrow=arrow(length = unit(0.1, "cm")))
  if ( includeText )
    p <- p + geom_text(aes(x=relStartTime, label=ganttName, hjust=0, vjust=-1), size=2)
  p <- p + xlim(0, maxRelTime * 1.3)
  p <- p + xlab(paste("Start time, relative to first job", RUNTIME_UNITS))
  p <- p + ylab("Job number")
  p <- p + ggtitle(title)
  print(p)
}

#
# Plots scheduling efficiency at job events
#
plotProgressByTime <- function(gatkReport) {
  allJobs = allJobsFromReport(gatkReport)
  nJobs = dim(allJobs)[1]
  allJobs = allJobs[order(allJobs$startTime, decreasing=F),]
  allJobs$index = 1:nrow(allJobs)

  minTime = min(allJobs$startTime)
  allJobs$relStartTime = allJobs$startTime - minTime
  allJobs$relDoneTime = allJobs$doneTime - minTime

  times = sort(c(allJobs$relStartTime, allJobs$relDoneTime))

  countJobs <- function(p) {
    s = allJobs$relStartTime
    e = allJobs$relDoneTime
    x = c() # I wish I knew how to make this work with apply
    for ( time in times )
      x = c(x, sum(p(s, e, time)))
    x
  }

  pending = countJobs(function(s, e, t) s > t)
  done = countJobs(function(s, e, t) e < t)
  running = nJobs - pending - done

  d = data.frame(times=times, pending=pending, running=running, done=done)
  
  p <- ggplot(data=melt(d, id.vars=c("times")), aes(x=times, y=value, color=variable))
  p <- p + facet_grid(variable ~ ., scales="free")
  p <- p + geom_line(size=2)
  p <- p + xlab(paste("Time since start of first job", RUNTIME_UNITS))
  p <- p + ggtitle("Job scheduling")
  print(p)
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
plotCoresByTime <- function(gatkReport) {
  allJobs = allJobsFromReport(gatkReport)
  nJobs = sum(allJobs$cores)
  allJobs = allJobs[order(allJobs$startTime, decreasing=F),]
  allJobs$index = 1:nrow(allJobs)
  
  minTime = min(allJobs$startTime)
  allJobs$relStartTime = allJobs$startTime - minTime
  allJobs$relDoneTime = allJobs$doneTime - minTime
  
  times = sort(c(allJobs$relStartTime, allJobs$relDoneTime))
  
  countJobs <- function(p) {
    s = allJobs$relStartTime
    e = allJobs$relDoneTime
    cpu = allJobs$cores
    x = c() # I wish I knew how to make this work with apply
    for ( time in times )
      x = c(x, sum(p(s, e, time) * cpu))
    x
  }
  
  pending = countJobs(function(s, e, t) s > t)
  done = countJobs(function(s, e, t) e < t)
  running = nJobs - pending - done
  
  d = data.frame(times=times, running=running)
  
  p <- ggplot(data=melt(d, id.vars=c("times")), aes(x=times, y=value, color=variable))
  p <- p + facet_grid(variable ~ ., scales="free")
  p <- p + geom_line(size=2)
  p <- p + xlab(paste("Time since start of first job", RUNTIME_UNITS))
  p <- p + ggtitle("Cores used in time")
  print(p)
}


142 143 144 145 146 147 148 149 150 151 152
# 
# Creates tables for each job in this group
#
standardColumns = c("jobName", "startTime", "formattedStartTime", "analysisName", "intermediate", "exechosts", "formattedDoneTime", "doneTime", "runtime")
plotGroup <- function(groupTable) {
  name = unique(groupTable$analysisName)[1]
  groupAnnotations = setdiff(names(groupTable), standardColumns)  
  sub = groupTable[,c("jobName", groupAnnotations, "runtime")]
  sub = sub[order(sub$iteration, sub$jobName, decreasing=F), ]
  
  # create a table showing each job and all annotations
153 154
#  textplot(sub, show.rownames=F)
#  title(paste("Job summary for", name, "full itemization"), cex=3)
155 156

  # create the table for each combination of values in the group, listing iterations in the columns
157 158 159
#  sum = cast(melt(sub, id.vars=groupAnnotations, measure.vars=c("runtime")), ... ~ iteration, fun.aggregate=mean)
#  textplot(as.data.frame(sum), show.rownames=F)
#  title(paste("Job summary for", name, "itemizing each iteration"), cex=3)
160 161 162 163 164 165 166 167 168 169 170

  # histogram of job times by groupAnnotations
  if ( length(groupAnnotations) == 1 && dim(sub)[1] > 1 ) {
    # todo -- how do we group by annotations?
    p <- ggplot(data=sub, aes(x=runtime)) + geom_histogram()
    p <- p + xlab(paste("runtime", RUNTIME_UNITS)) + ylab("No. of jobs")
    p <- p + ggtitle(paste("Job runtime histogram for", name))
    print(p)
  }
  
  # as above, but averaging over all iterations
171 172 173 174 175 176 177 178
#  groupAnnotationsNoIteration = setdiff(groupAnnotations, "iteration")
#  if ( dim(sub)[1] > 1 ) {
#    try({ # need a try here because we will fail to reduce when there's just a single iteration
#      sum = cast(melt(sub, id.vars=groupAnnotationsNoIteration, measure.vars=c("runtime")), ... ~ ., fun.aggregate=c(mean, sd))
#      textplot(as.data.frame(sum), show.rownames=F)
#      title(paste("Job summary for", name, "averaging over all iterations"), cex=3)
#    }, silent=T)
#  }
179 180 181 182 183 184 185 186
}
    
# print out some useful basic information
print("Report")
print(paste("Project          :", inputFileName))

convertUnits <- function(gatkReportData) {
  convertGroup <- function(g) {
187
    if (is.null(g$cores)) {g$cores = 1}
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    g$runtime = g$runtime * ORIGINAL_UNITS_TO_RUNTIME_UNITS
    g$startTime = g$startTime * ORIGINAL_UNITS_TO_RUNTIME_UNITS
    g$doneTime = g$doneTime * ORIGINAL_UNITS_TO_RUNTIME_UNITS
    g
  }
  lapply(gatkReportData, convertGroup)
}

#
# Plots runtimes by analysis name and exechosts
#
# Useful to understand the performance of analysis jobs by hosts, 
# and to debug problematic nodes
# 
plotTimeByHost <- function(gatkReportData) {
  fields = c("analysisName", "exechosts", "runtime")
  
  runtimes = data.frame()
  for ( report in gatkReportData ) {
    runtimes = rbind(runtimes, report[,fields])
  }
  
  plotMe <- function(name, vis) {
    p = ggplot(data=runtimes, aes(x=exechosts, y=runtime, group=exechosts, color=exechosts))
    p = p + facet_grid(analysisName ~ ., scale="free")
    p = p + vis()
    p = p + xlab("Job execution host")
    p = p + ggtitle(paste(name, "of job runtimes by analysis name and execution host"))
    p = p + ylab(paste("Distribution of runtimes", RUNTIME_UNITS))
    p = p + theme(axis.text.x=element_text(angle=45, hjust=1, vjust=1))
    print(p)
  }
  
  plotMe("Boxplot", geom_boxplot)
  plotMe("Jittered points", geom_jitter)
}

mergeScattersForAnalysis <- function(table) {
  #allJobs$ganttName = paste(allJobs$jobName, "@", allJobs$exechosts)
  
  ddply(table, .(analysisName, iteration), summarize,
        jobName = analysisName[1],
        exechosts = paste(length(exechosts), "hosts"),
        formattedStartTime = "NA",
        formattedDoneTime = "NA",
        intermediate = intermediate[1],
        startTime = min(startTime),
        doneTime = min(startTime) + sum(runtime),
236 237
        runtime = sum(runtime), 
        cores = min(cores))
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
}

mergeScatters <- function(report) {
  newReport = list()
  for ( name in names(gatkReportData) ) {
    newReport[[name]] = mergeScattersForAnalysis(gatkReportData[[name]])
  }
  newReport
}
  
# read the table
gatkReportData <- gsa.read.gatkreport(inputFileName)
gatkReportData <- convertUnits(gatkReportData)
#print(summary(gatkReportData))

if ( ! is.na(outputPDF) ) {
  pdf(outputPDF, height=8.5, width=11)
} 

plotJobsGantt(gatkReportData, T, "All jobs, by analysis, by start time", F)
plotJobsGantt(gatkReportData, F, "All jobs, sorted by start time", F)
plotProgressByTime(gatkReportData)
260
plotCoresByTime(gatkReportData)
261 262 263 264 265

# plots summarizing overall costs, merging scattered counts
merged.by.scatter = mergeScatters(gatkReportData) 
plotJobsGantt(merged.by.scatter, F, "Jobs merged by scatter by start time", T)

266 267
merged.as.df = do.call(rbind.data.frame, merged.by.scatter)[,c("analysisName", "runtime", "cores")]
merged.as.df$cputime = merged.as.df$runtime * merged.as.df$cores
Peter van 't Hof's avatar
Peter van 't Hof committed
268 269
merged.as.df$percent = merged.as.df$runtime / sum(merged.as.df$runtime) * 100
merged.as.df$percentCpu = merged.as.df$cputime / sum(merged.as.df$cputime) * 100
270 271
merged.as.df.formatted = data.frame(
  analysisName=merged.as.df$analysisName,
Peter van 't Hof's avatar
Peter van 't Hof committed
272 273
  walltime=prettyNum(merged.as.df$runtime),
  percent=prettyNum(merged.as.df$percent,digits=2),
274 275
  cores=merged.as.df$cores, 
  cputime=prettyNum(merged.as.df$cputime), 
Peter van 't Hof's avatar
Peter van 't Hof committed
276 277
  percentCpu=prettyNum(merged.as.df$percentCpu,digits=2))
textplot(merged.as.df.formatted[order(merged.as.df$percentCpu),], show.rownames=F)
278

279 280
title("Total runtime for each analysis")

281
#plotTimeByHost(gatkReportData)
282 283 284 285 286 287 288 289 290 291 292
for ( group in gatkReportData ) {
  #print(group)
  plotGroup(group)
}
  
if ( ! is.na(outputPDF) ) {
  dev.off()
  if (exists("compactPDF")) {
    compactPDF(outputPDF)
  }
}