From 58bc1071ea64ccd0df4a5b1ee54293ca80368f11 Mon Sep 17 00:00:00 2001 From: jianghe1 Date: Wed, 27 May 2020 14:22:42 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app-conf/AggregatorConf.xml | 24 +- app-conf/FetcherConf.xml | 25 +- app-conf/HeuristicConf.xml | 326 --------------- app-conf/HeuristicConf_bak.xml | 379 ++++++++++++++++++ app-conf/elephant.conf | 8 +- app-conf/resolver.conf | 8 + .../linkedin/drelephant/ElephantRunner.java | 4 + .../drelephant/analysis/AnalyticJob.java | 2 + .../analysis/AnalyticJobGeneratorHadoop2.java | 3 + .../spark/fetchers/SparkFetcher.scala | 2 +- .../spark/fetchers/SparkRestClient.scala | 6 +- .../heuristics/ConfigurationHeuristic.scala | 8 +- .../spark/heuristics/ExecutorsHeuristic.scala | 2 + .../tony/TonyMetricsAggregator.java | 2 +- .../tony/data/TonyApplicationData.java | 2 +- .../drelephant/tony/fetchers/TonyFetcher.java | 2 +- app/com/linkedin/drelephant/util/Utils.java | 5 +- .../deploy/history/SparkDataCollection.scala | 49 ++- .../spark/deploy/history/SparkFSFetcher.scala | 12 +- .../sql/catalyst/catalog/events/events.scala | 204 ++++++++++ .../StorageStatusTrackingListener.scala | 6 +- compile.conf | 2 +- compile.sh | 2 +- conf/evolutions/default/1.sql | 18 +- conf/evolutions/default/5.sql | 26 +- conf/log4j.properties | 4 +- project/Dependencies.scala | 11 +- project/plugins.sbt | 2 +- public/assets/bootstrap/js/bootstrap.js | 4 +- public/assets/d3/d3.js | 6 +- public/assets/d3/d3.min.js | 2 +- .../azkaban/AzkabanJobLogAnalyzerTest.java | 8 +- .../tony/TonyMetricsAggregatorTest.java | 2 +- .../tony/fetchers/TonyFetcherTest.java | 4 +- .../drelephant/util/InfoExtractorTest.java | 16 +- 35 files changed, 741 insertions(+), 445 deletions(-) create mode 100644 app-conf/HeuristicConf_bak.xml create mode 100644 app-conf/resolver.conf create mode 100644 app/org/apache/spark/sql/catalyst/catalog/events/events.scala diff --git a/app-conf/AggregatorConf.xml b/app-conf/AggregatorConf.xml index 3fecc3300..c9564185b 100644 --- a/app-conf/AggregatorConf.xml +++ b/app-conf/AggregatorConf.xml @@ -29,14 +29,14 @@ --> - - mapreduce - com.linkedin.drelephant.mapreduce.MapReduceMetricsAggregator - - - tez - com.linkedin.drelephant.tez.TezMetricsAggregator - + + + + + + + + spark com.linkedin.drelephant.spark.SparkMetricsAggregator @@ -44,8 +44,8 @@ 0.5 - - tony - com.linkedin.drelephant.tony.TonyMetricsAggregator - + + + + diff --git a/app-conf/FetcherConf.xml b/app-conf/FetcherConf.xml index 1f5e49308..2bf8f0c2d 100644 --- a/app-conf/FetcherConf.xml +++ b/app-conf/FetcherConf.xml @@ -32,10 +32,10 @@ - - tez - com.linkedin.drelephant.tez.fetchers.TezFetcher - + + + + - + + - - spark - com.linkedin.drelephant.spark.fetchers.FSFetcher - + + + + spark com.linkedin.drelephant.spark.fetchers.SparkFetcher @@ -115,7 +116,7 @@ true - --> + - - - - tez - Tez Mapper Data Skew - com.linkedin.drelephant.tez.heuristics.MapperDataSkewHeuristic - views.html.help.tez.helpMapperDataSkew - - - - - tez - Tez Mapper GC - com.linkedin.drelephant.tez.heuristics.MapperGCHeuristic - views.html.help.tez.helpGC - - - - - tez - Tez Mapper Time - com.linkedin.drelephant.tez.heuristics.MapperTimeHeuristic - views.html.help.tez.helpMapperTime - - - - - tez - Tez Mapper Speed - com.linkedin.drelephant.tez.heuristics.MapperSpeedHeuristic - views.html.help.tez.helpMapperSpeed - - - - - tez - Tez Mapper Memory - com.linkedin.drelephant.tez.heuristics.MapperMemoryHeuristic - views.html.help.tez.helpMemory - - - - - tez - Tez Mapper Spill - com.linkedin.drelephant.tez.heuristics.MapperSpillHeuristic - views.html.help.tez.helpMapperSpill - - - - - tez - Tez Reducer Data Skew - com.linkedin.drelephant.tez.heuristics.ReducerDataSkewHeuristic - views.html.help.mapreduce.helpReducerSkew - - - - - tez - Tez Reducer GC - com.linkedin.drelephant.tez.heuristics.ReducerGCHeuristic - views.html.help.mapreduce.helpGC - - - - - tez - Tez Reducer Time - com.linkedin.drelephant.tez.heuristics.ReducerTimeHeuristic - views.html.help.tez.helpReducerTime - - - - - tez - Scope Memory - com.linkedin.drelephant.tez.heuristics.TezScopeMemoryHeuristic - views.html.help.tez.helpMemory - - - - tez - Scope Data Skew - com.linkedin.drelephant.tez.heuristics.TezScopeDataSkewHeuristic - views.html.help.tez.helpScopeTaskDataSkew - - - - tez - Scope GC - com.linkedin.drelephant.tez.heuristics.TezScopeGCHeuristic - views.html.help.tez.helpGC - - - - tez - Scope Speed - com.linkedin.drelephant.tez.heuristics.TezScopeSpeedHeuristic - views.html.help.tez.helpMapperSpeed - - - - - tez - Scope Time - com.linkedin.drelephant.tez.heuristics.TezScopeTimeHeuristic - views.html.help.tez.helpScopeTaskTime - - - - - - - mapreduce - Mapper Skew - com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic - views.html.help.mapreduce.helpMapperSkew - - - - - mapreduce - Mapper GC - com.linkedin.drelephant.mapreduce.heuristics.MapperGCHeuristic - views.html.help.mapreduce.helpGC - - - - - mapreduce - Mapper Time - com.linkedin.drelephant.mapreduce.heuristics.MapperTimeHeuristic - views.html.help.mapreduce.helpMapperTime - - - - - mapreduce - Mapper Speed - com.linkedin.drelephant.mapreduce.heuristics.MapperSpeedHeuristic - views.html.help.mapreduce.helpMapperSpeed - - - - - mapreduce - Mapper Spill - com.linkedin.drelephant.mapreduce.heuristics.MapperSpillHeuristic - views.html.help.mapreduce.helpMapperSpill - - - - - mapreduce - Mapper Memory - com.linkedin.drelephant.mapreduce.heuristics.MapperMemoryHeuristic - views.html.help.mapreduce.helpMapperMemory - - - - - mapreduce - Reducer Skew - com.linkedin.drelephant.mapreduce.heuristics.ReducerSkewHeuristic - views.html.help.mapreduce.helpReducerSkew - - - - - mapreduce - Reducer GC - com.linkedin.drelephant.mapreduce.heuristics.ReducerGCHeuristic - views.html.help.mapreduce.helpGC - - - - - mapreduce - Reducer Time - com.linkedin.drelephant.mapreduce.heuristics.ReducerTimeHeuristic - views.html.help.mapreduce.helpReducerTime - - - - - mapreduce - Reducer Memory - com.linkedin.drelephant.mapreduce.heuristics.ReducerMemoryHeuristic - views.html.help.mapreduce.helpReducerMemory - - - - - mapreduce - Shuffle & Sort - com.linkedin.drelephant.mapreduce.heuristics.ShuffleSortHeuristic - views.html.help.mapreduce.helpShuffleSort - - - - - mapreduce - Exception - com.linkedin.drelephant.mapreduce.heuristics.ExceptionHeuristic - views.html.help.mapreduce.helpException - - - - mapreduce - Distributed Cache Limit - com.linkedin.drelephant.mapreduce.heuristics.DistributedCacheLimitHeuristic - views.html.help.mapreduce.helpDistributedCacheLimit - - 500000000 - - - - @@ -354,26 +50,4 @@ views.html.help.spark.helpExecutorGcHeuristic - - - - tony - Task Memory - com.linkedin.drelephant.tony.heuristics.TaskMemoryHeuristic - views.html.help.tony.helpTaskMemory - - - - - - - - - tony - Task GPU - com.linkedin.drelephant.tony.heuristics.TaskGPUHeuristic - views.html.help.tony.helpTaskGPU - - - diff --git a/app-conf/HeuristicConf_bak.xml b/app-conf/HeuristicConf_bak.xml new file mode 100644 index 000000000..c14f85556 --- /dev/null +++ b/app-conf/HeuristicConf_bak.xml @@ -0,0 +1,379 @@ + + + + + + + + + tez + Tez Mapper Data Skew + com.linkedin.drelephant.tez.heuristics.MapperDataSkewHeuristic + views.html.help.tez.helpMapperDataSkew + + + + + tez + Tez Mapper GC + com.linkedin.drelephant.tez.heuristics.MapperGCHeuristic + views.html.help.tez.helpGC + + + + + tez + Tez Mapper Time + com.linkedin.drelephant.tez.heuristics.MapperTimeHeuristic + views.html.help.tez.helpMapperTime + + + + + tez + Tez Mapper Speed + com.linkedin.drelephant.tez.heuristics.MapperSpeedHeuristic + views.html.help.tez.helpMapperSpeed + + + + + tez + Tez Mapper Memory + com.linkedin.drelephant.tez.heuristics.MapperMemoryHeuristic + views.html.help.tez.helpMemory + + + + + tez + Tez Mapper Spill + com.linkedin.drelephant.tez.heuristics.MapperSpillHeuristic + views.html.help.tez.helpMapperSpill + + + + + tez + Tez Reducer Data Skew + com.linkedin.drelephant.tez.heuristics.ReducerDataSkewHeuristic + views.html.help.mapreduce.helpReducerSkew + + + + + tez + Tez Reducer GC + com.linkedin.drelephant.tez.heuristics.ReducerGCHeuristic + views.html.help.mapreduce.helpGC + + + + + tez + Tez Reducer Time + com.linkedin.drelephant.tez.heuristics.ReducerTimeHeuristic + views.html.help.tez.helpReducerTime + + + + + tez + Scope Memory + com.linkedin.drelephant.tez.heuristics.TezScopeMemoryHeuristic + views.html.help.tez.helpMemory + + + + tez + Scope Data Skew + com.linkedin.drelephant.tez.heuristics.TezScopeDataSkewHeuristic + views.html.help.tez.helpScopeTaskDataSkew + + + + tez + Scope GC + com.linkedin.drelephant.tez.heuristics.TezScopeGCHeuristic + views.html.help.tez.helpGC + + + + tez + Scope Speed + com.linkedin.drelephant.tez.heuristics.TezScopeSpeedHeuristic + views.html.help.tez.helpMapperSpeed + + + + + tez + Scope Time + com.linkedin.drelephant.tez.heuristics.TezScopeTimeHeuristic + views.html.help.tez.helpScopeTaskTime + + + + + + + mapreduce + Mapper Skew + com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic + views.html.help.mapreduce.helpMapperSkew + + + + + mapreduce + Mapper GC + com.linkedin.drelephant.mapreduce.heuristics.MapperGCHeuristic + views.html.help.mapreduce.helpGC + + + + + mapreduce + Mapper Time + com.linkedin.drelephant.mapreduce.heuristics.MapperTimeHeuristic + views.html.help.mapreduce.helpMapperTime + + + + + mapreduce + Mapper Speed + com.linkedin.drelephant.mapreduce.heuristics.MapperSpeedHeuristic + views.html.help.mapreduce.helpMapperSpeed + + + + + mapreduce + Mapper Spill + com.linkedin.drelephant.mapreduce.heuristics.MapperSpillHeuristic + views.html.help.mapreduce.helpMapperSpill + + + + + mapreduce + Mapper Memory + com.linkedin.drelephant.mapreduce.heuristics.MapperMemoryHeuristic + views.html.help.mapreduce.helpMapperMemory + + + + + mapreduce + Reducer Skew + com.linkedin.drelephant.mapreduce.heuristics.ReducerSkewHeuristic + views.html.help.mapreduce.helpReducerSkew + + + + + mapreduce + Reducer GC + com.linkedin.drelephant.mapreduce.heuristics.ReducerGCHeuristic + views.html.help.mapreduce.helpGC + + + + + mapreduce + Reducer Time + com.linkedin.drelephant.mapreduce.heuristics.ReducerTimeHeuristic + views.html.help.mapreduce.helpReducerTime + + + + + mapreduce + Reducer Memory + com.linkedin.drelephant.mapreduce.heuristics.ReducerMemoryHeuristic + views.html.help.mapreduce.helpReducerMemory + + + + + mapreduce + Shuffle & Sort + com.linkedin.drelephant.mapreduce.heuristics.ShuffleSortHeuristic + views.html.help.mapreduce.helpShuffleSort + + + + + mapreduce + Exception + com.linkedin.drelephant.mapreduce.heuristics.ExceptionHeuristic + views.html.help.mapreduce.helpException + + + + mapreduce + Distributed Cache Limit + com.linkedin.drelephant.mapreduce.heuristics.DistributedCacheLimitHeuristic + views.html.help.mapreduce.helpDistributedCacheLimit + + 500000000 + + + + + + + + spark + Spark Configuration + com.linkedin.drelephant.spark.heuristics.ConfigurationHeuristic + views.html.help.spark.helpConfigurationHeuristic + + + spark + Spark Executor Metrics + com.linkedin.drelephant.spark.heuristics.ExecutorsHeuristic + views.html.help.spark.helpExecutorsHeuristic + + + spark + Spark Job Metrics + com.linkedin.drelephant.spark.heuristics.JobsHeuristic + views.html.help.spark.helpJobsHeuristic + + + spark + Spark Stage Metrics + com.linkedin.drelephant.spark.heuristics.StagesHeuristic + views.html.help.spark.helpStagesHeuristic + + + spark + Executor GC + com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic + views.html.help.spark.helpExecutorGcHeuristic + + + + + + tony + Task Memory + com.linkedin.drelephant.tony.heuristics.TaskMemoryHeuristic + views.html.help.tony.helpTaskMemory + + + + + + + + + tony + Task GPU + com.linkedin.drelephant.tony.heuristics.TaskGPUHeuristic + views.html.help.tony.helpTaskGPU + + + + diff --git a/app-conf/elephant.conf b/app-conf/elephant.conf index bc65ddf02..5f6ffc653 100644 --- a/app-conf/elephant.conf +++ b/app-conf/elephant.conf @@ -1,5 +1,5 @@ # Play application server port -http_port=8080 +http_port=8720 # Un-comment these configs if need to enable SSL for Dr.Elephant #https_port=8090 @@ -15,10 +15,10 @@ http_port=8080 # application_secret="changeme" # Database configuration -db_url=localhost +db_url=172.22.180.19 db_name=drelephant -db_user=root -db_password="" +db_user=salt +db_password=salt # Enable web analytics for the application. # By default analytics is not turned on. Set this property diff --git a/app-conf/resolver.conf b/app-conf/resolver.conf new file mode 100644 index 000000000..247f5246d --- /dev/null +++ b/app-conf/resolver.conf @@ -0,0 +1,8 @@ +[repositories] + local + maven-central: https://repo1.maven.org/maven2/ + play-central: https://repo1.maven.org/maven2/com/typesafe/ + tysafe-maven: https://repo.typesafe.com/typesafe/maven-releases/ + sbt-plugin-repo: https://repo.scala-sbt.org/scalasbt/sbt-plugin-releases, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext] + typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly + \ No newline at end of file diff --git a/app/com/linkedin/drelephant/ElephantRunner.java b/app/com/linkedin/drelephant/ElephantRunner.java index 55e6a8fe2..2518add83 100644 --- a/app/com/linkedin/drelephant/ElephantRunner.java +++ b/app/com/linkedin/drelephant/ElephantRunner.java @@ -20,6 +20,7 @@ import com.avaje.ebean.TxRunnable; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.Gson; import com.linkedin.drelephant.analysis.AnalyticJob; import com.linkedin.drelephant.analysis.AnalyticJobGenerator; import com.linkedin.drelephant.analysis.AnalyticJobGeneratorHadoop2; @@ -188,6 +189,7 @@ public Void run() { List todos; try { todos = _analyticJobGenerator.fetchAnalyticJobs(); + logger.info("return todos size :" + todos==null?0:todos.size()); } catch (Exception e) { logger.error("Error fetching job list. Try again later...", e); //Wait for a while before retry @@ -386,6 +388,7 @@ public void run() { long applicableFinishTime = -1; long jobFinishTime = -1; FinishTimeInfo finishTimeInfo = null; + Gson gson = new Gson(); try { final AppResult result = _analyticJob.getAnalysis(); applicableFinishTime = getApplicableFinishTime(_analyticJob); @@ -396,6 +399,7 @@ public void run() { // Execute as a transaction. Ebean.execute(new TxRunnable() { public void run() { + logger.info(String.format("分析结果存储%s:%s", result.id, gson.toJson(result))); result.save(); if (backfillInfo != null) { backfillInfo.save(); diff --git a/app/com/linkedin/drelephant/analysis/AnalyticJob.java b/app/com/linkedin/drelephant/analysis/AnalyticJob.java index b7aec1a98..fb41a56d8 100644 --- a/app/com/linkedin/drelephant/analysis/AnalyticJob.java +++ b/app/com/linkedin/drelephant/analysis/AnalyticJob.java @@ -317,9 +317,11 @@ public AppResult getAnalysis() throws Exception { logger.info("No Data Received for analytic job: " + getAppId()); analysisResults.add(HeuristicResult.NO_DATA); } else { + logger.info("getHeuristicsForApplicationType"); List heuristics = ElephantContext.instance().getHeuristicsForApplicationType(getAppType()); for (Heuristic heuristic : heuristics) { String confExcludedApps = heuristic.getHeuristicConfData().getParamMap().get(EXCLUDE_JOBTYPE); + logger.info("confExcludedApps: " + confExcludedApps + " cls:" + heuristic.getHeuristicConfData().getClassName()); if (confExcludedApps == null || confExcludedApps.length() == 0 || !Arrays.asList(confExcludedApps.split(",")).contains(jobTypeName)) { diff --git a/app/com/linkedin/drelephant/analysis/AnalyticJobGeneratorHadoop2.java b/app/com/linkedin/drelephant/analysis/AnalyticJobGeneratorHadoop2.java index 69489812f..fa128f505 100644 --- a/app/com/linkedin/drelephant/analysis/AnalyticJobGeneratorHadoop2.java +++ b/app/com/linkedin/drelephant/analysis/AnalyticJobGeneratorHadoop2.java @@ -69,8 +69,10 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator { private final List _secondRetryQueue = new LinkedList(); public void updateResourceManagerAddresses() { + logger.info("configuration.get(IS_RM_HA_ENABLED):" + configuration.get(IS_RM_HA_ENABLED)); if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) { String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS); + logger.info("updateResourceManagerAddresses_resourceManagers: " + resourceManagers); if (resourceManagers != null) { logger.info("The list of RM IDs are " + resourceManagers); List ids = Arrays.asList(resourceManagers.split(",")); @@ -144,6 +146,7 @@ public List fetchAnalyticJobs() // There is a lag of job data from AM/NM to JobHistoryServer HDFS, we shouldn't use the current time, since there // might be new jobs arriving after we fetch jobs. We provide one minute delay to address this lag. _currentTime = System.currentTimeMillis() - FETCH_DELAY; + _lastTime = _currentTime - FETCH_DELAY; updateAuthToken(); logger.info("Fetching recent finished application runs between last time: " + (_lastTime + 1) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala index 7ee5b47c7..b6bfcebab 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala @@ -189,6 +189,6 @@ object SparkFetcher { } val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled" - val DEFAULT_TIMEOUT = Duration(5, SECONDS) + val DEFAULT_TIMEOUT = Duration(30, SECONDS) //默认5s,暂时修改成30s val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri" } diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index abeac17ab..e056d8fe0 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -285,9 +285,9 @@ object SparkRestClient { val HISTORY_SERVER_ADDRESS_KEY = "spark.yarn.historyServer.address" val API_V1_MOUNT_PATH = "api/v1" val IN_PROGRESS = ".inprogress" - val DEFAULT_TIMEOUT = Duration(5, SECONDS); - val CONNECTION_TIMEOUT = 5000 - val READ_TIMEOUT = 5000 + val DEFAULT_TIMEOUT = Duration(10, SECONDS); + val CONNECTION_TIMEOUT = 10000 + val READ_TIMEOUT = 10000 val SparkRestObjectMapper = { val dateFormat = { diff --git a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala index 0c0193ef5..c7f3d9701 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala @@ -148,7 +148,7 @@ object ConfigurationHeuristic { val SPARK_YARN_JARS = "spark.yarn.secondary.jars" val SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead" val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead" - val THRESHOLD_MIN_EXECUTORS: Int = 1 + val THRESHOLD_MIN_EXECUTORS: Int = 2 //默认1 val THRESHOLD_MAX_EXECUTORS: Int = 900 val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key" val DEFAULT_SPARK_OVERHEAD_MEMORY_THRESHOLDS = @@ -158,6 +158,9 @@ object ConfigurationHeuristic { class Evaluator(configurationHeuristic: ConfigurationHeuristic, data: SparkApplicationData) { lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties + println("start====================") + appConfigurationProperties.foreach(x => println(x._1, x._2)) + println("end====================") lazy val driverMemoryBytes: Option[Long] = Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None) @@ -187,6 +190,7 @@ object ConfigurationHeuristic { } lazy val sparkYarnJars: String = getProperty(SPARK_YARN_JARS).getOrElse("") + println("sparkYarnJars=" + sparkYarnJars) lazy val jarsSeverity: Severity = if (sparkYarnJars.contains("*")) { Severity.CRITICAL @@ -235,6 +239,8 @@ object ConfigurationHeuristic { val severityExecutorMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead)) val severityDriverMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead)) + println("sparkYarnExecutorMemoryOverhead=" + MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead) + " severityExecutorMemoryOverhead=" + severityExecutorMemoryOverhead) + println("sparkYarnDriverMemoryOverhead=" + MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead) + " severityDriverMemoryOverhead=" + severityDriverMemoryOverhead) //Severity for the configuration thresholds val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityExecutorCores, severityExecutorMemory, diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala index dae604124..6088cc7f2 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala @@ -214,6 +214,8 @@ object ExecutorsHeuristic { private lazy val ignoreMaxBytesLessThanThreshold = executorsHeuristic.ignoreMaxBytesLessThanThreshold private lazy val ignoreMaxMillisLessThanThreshold = executorsHeuristic.ignoreMaxMillisLessThanThreshold + + println( "maxToMedianRatioSeverityThresholds=" + maxToMedianRatioSeverityThresholds) } case class Distribution(min: Long, p25: Long, median: Long, p75: Long, max: Long) diff --git a/app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java b/app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java index 540e54d22..d5ec39d65 100644 --- a/app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java +++ b/app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java @@ -66,7 +66,7 @@ public void aggregate(HadoopApplicationData data) { for (TonyTaskData taskData : entry.getValue().values()) { long taskDurationSec = (taskData.getTaskEndTime() - taskData.getTaskStartTime()) / Statistics.SECOND_IN_MS; if (taskDurationSec < 0) { - // Most likely TASK_FINISHED and APPLICATION_FINISHED events are missing for the task. + // Most likely TASK_FINISHED and APPLICATION_FINISHED events.scala are missing for the task. continue; } mbSecUsed += mbRequested * taskDurationSec; diff --git a/app/com/linkedin/drelephant/tony/data/TonyApplicationData.java b/app/com/linkedin/drelephant/tony/data/TonyApplicationData.java index 7021da0a5..a450b71b2 100644 --- a/app/com/linkedin/drelephant/tony/data/TonyApplicationData.java +++ b/app/com/linkedin/drelephant/tony/data/TonyApplicationData.java @@ -40,7 +40,7 @@ public class TonyApplicationData implements HadoopApplicationData { * @param appId application id * @param appType application type (should be TONY) * @param configuration the configuration for this application - * @param events the events emitted by this application + * @param events the events.scala emitted by this application */ public TonyApplicationData(String appId, ApplicationType appType, Configuration configuration, List events) { _appId = appId; diff --git a/app/com/linkedin/drelephant/tony/fetchers/TonyFetcher.java b/app/com/linkedin/drelephant/tony/fetchers/TonyFetcher.java index 82e271bb0..a7d580ee0 100644 --- a/app/com/linkedin/drelephant/tony/fetchers/TonyFetcher.java +++ b/app/com/linkedin/drelephant/tony/fetchers/TonyFetcher.java @@ -99,7 +99,7 @@ public TonyApplicationData fetchData(AnalyticJob job) throws Exception { conf.addResource(_fs.open(confFile)); } - // Parse events. For a list of event types, see + // Parse events.scala. For a list of event types, see // https://github.com/linkedin/TonY/blob/master/tony-core/src/main/avro/EventType.avsc. // We get the task start time from the TASK_STARTED event and the finish time and metrics from the TASK_FINISHED // event. diff --git a/app/com/linkedin/drelephant/util/Utils.java b/app/com/linkedin/drelephant/util/Utils.java index f88d17c46..541665d6c 100644 --- a/app/com/linkedin/drelephant/util/Utils.java +++ b/app/com/linkedin/drelephant/util/Utils.java @@ -157,6 +157,7 @@ public static Map parseJavaOptions(String str) { public static double[] getParam(String rawLimits, int thresholdLevels) { double[] parsedLimits = null; + logger.info(String.format("rawLimits=%s, thresholdLevels=%s", rawLimits, thresholdLevels)); if (rawLimits != null && !rawLimits.isEmpty()) { String[] thresholds = rawLimits.split(","); if (thresholds.length != thresholdLevels) { @@ -177,6 +178,7 @@ public static double[] getParam(String rawLimits, int thresholdLevels) { } } } + logger.info(String.format("parsedLimits=%s", parsedLimits==null?"":parsedLimits.toString())); return parsedLimits; } @@ -258,7 +260,8 @@ public static Map parseCsKeyValue(String str) { */ public static String truncateField(String field, int limit, String context) { if (field != null && limit > TRUNCATE_SUFFIX.length() && field.length() > limit) { - logger.info("Truncating " + field + " to " + limit + " characters for " + context); +// logger.info("Truncating " + field + " to " + limit + " characters for " + context); + logger.info("Truncating " + limit + " characters for " + context); field = field.substring(0, limit - 3) + "..."; } return field; diff --git a/app/org/apache/spark/deploy/history/SparkDataCollection.scala b/app/org/apache/spark/deploy/history/SparkDataCollection.scala index f60fcfa19..56eabebd9 100644 --- a/app/org/apache/spark/deploy/history/SparkDataCollection.scala +++ b/app/org/apache/spark/deploy/history/SparkDataCollection.scala @@ -49,32 +49,32 @@ class SparkDataCollection extends SparkApplicationData { lazy val applicationEventListener = new ApplicationEventListener() lazy val jobProgressListener = new JobProgressListener(new SparkConf()) lazy val environmentListener = new EnvironmentListener() - lazy val storageStatusListener = new StorageStatusListener() - lazy val executorsListener = new ExecutorsListener(storageStatusListener) + lazy val storageStatusListener = new StorageStatusListener(new SparkConf()) + lazy val executorsListener = new ExecutorsListener(storageStatusListener, new SparkConf()) lazy val storageListener = new StorageListener(storageStatusListener) // This is a customized listener that tracks peak used memory // The original listener only tracks the current in use memory which is useless in offline scenario. lazy val storageStatusTrackingListener = new StorageStatusTrackingListener() - private var _applicationData: SparkGeneralData = null; - private var _jobProgressData: SparkJobProgressData = null; - private var _environmentData: SparkEnvironmentData = null; - private var _executorData: SparkExecutorData = null; - private var _storageData: SparkStorageData = null; - private var _isThrottled: Boolean = false; + private var _applicationData: SparkGeneralData = _ + private var _jobProgressData: SparkJobProgressData = _ + private var _environmentData: SparkEnvironmentData = _ + private var _executorData: SparkExecutorData = _ + private var _storageData: SparkStorageData = _ + private var _isThrottled: Boolean = false def throttle(): Unit = { _isThrottled = true } - override def isThrottled(): Boolean = _isThrottled + override def isThrottled: Boolean = _isThrottled override def getApplicationType(): ApplicationType = APPLICATION_TYPE - override def getConf(): Properties = getEnvironmentData().getSparkProperties() + override def getConf(): Properties = getEnvironmentData().getSparkProperties - override def isEmpty(): Boolean = !isThrottled() && getExecutorData().getExecutors.isEmpty() + override def isEmpty(): Boolean = !isThrottled() && getExecutorData().getExecutors.isEmpty override def getGeneralData(): SparkGeneralData = { if (_applicationData == null) { @@ -164,10 +164,13 @@ class SparkDataCollection extends SparkApplicationData { if (_executorData == null) { _executorData = new SparkExecutorData() - for (statusId <- 0 until executorsListener.storageStatusList.size) { + val storageStatusList = executorsListener.activeStorageStatusList ++ executorsListener.deadStorageStatusList + + // for (statusId <- 0 until executorsListener.storageStatusList.size) { + for (statusId <- 0 until storageStatusList.size) { val info = new ExecutorInfo() - val status = executorsListener.storageStatusList(statusId) + val status = storageStatusList(statusId) info.execId = status.blockManagerId.executorId info.hostPort = status.blockManagerId.hostPort @@ -178,14 +181,18 @@ class SparkDataCollection extends SparkApplicationData { info.memUsed = storageStatusTrackingListener.executorIdToMaxUsedMem.getOrElse(info.execId, 0L) info.maxMem = status.maxMem info.diskUsed = status.diskUsed - info.activeTasks = executorsListener.executorToTasksActive.getOrElse(info.execId, 0) - info.failedTasks = executorsListener.executorToTasksFailed.getOrElse(info.execId, 0) - info.completedTasks = executorsListener.executorToTasksComplete.getOrElse(info.execId, 0) - info.totalTasks = info.activeTasks + info.failedTasks + info.completedTasks - info.duration = executorsListener.executorToDuration.getOrElse(info.execId, 0L) - info.inputBytes = executorsListener.executorToInputBytes.getOrElse(info.execId, 0L) - info.shuffleRead = executorsListener.executorToShuffleRead.getOrElse(info.execId, 0L) - info.shuffleWrite = executorsListener.executorToShuffleWrite.getOrElse(info.execId, 0L) + if (!info.execId.equalsIgnoreCase("driver")) { + info.activeTasks = executorsListener.executorToTaskSummary(info.execId).tasksActive + info.failedTasks = executorsListener.executorToTaskSummary(info.execId).tasksFailed + info.completedTasks = executorsListener.executorToTaskSummary(info.execId).tasksComplete + info.totalTasks = info.activeTasks + info.failedTasks + info.completedTasks + info.duration = executorsListener.executorToTaskSummary(info.execId).duration + info.inputBytes = executorsListener.executorToTaskSummary(info.execId).inputBytes + info.outputBytes = executorsListener.executorToTaskSummary(info.execId).outputBytes + info.shuffleRead = executorsListener.executorToTaskSummary(info.execId).shuffleRead + info.shuffleWrite = executorsListener.executorToTaskSummary(info.execId).shuffleWrite + info.totalGCTime = executorsListener.executorToTaskSummary(info.execId).jvmGCTime + } _executorData.setExecutorInfo(info.execId, info) } diff --git a/app/org/apache/spark/deploy/history/SparkFSFetcher.scala b/app/org/apache/spark/deploy/history/SparkFSFetcher.scala index ecac7f608..db2b3c4d4 100644 --- a/app/org/apache/spark/deploy/history/SparkFSFetcher.scala +++ b/app/org/apache/spark/deploy/history/SparkFSFetcher.scala @@ -16,7 +16,6 @@ package org.apache.spark.deploy.history -import java.io.InputStream import java.security.PrivilegedAction import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher} @@ -26,15 +25,8 @@ import com.linkedin.drelephant.spark.legacydata.SparkApplicationData import com.linkedin.drelephant.util.{HadoopUtils, SparkUtils, Utils} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.Logger import org.apache.spark.SparkConf -import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus} -import org.apache.spark.storage.{StorageStatusListener, StorageStatusTrackingListener} -import org.apache.spark.ui.env.EnvironmentListener -import org.apache.spark.ui.exec.ExecutorsListener -import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.ui.storage.StorageListener /** @@ -63,7 +55,9 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant protected lazy val sparkConf: SparkConf = { val sparkConf = new SparkConf() sparkUtils.getDefaultPropertiesFile() match { - case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename)) + case Some(filename) => + logger.info("Found spark conf file: " + filename) + sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename)) case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR") } sparkConf diff --git a/app/org/apache/spark/sql/catalyst/catalog/events/events.scala b/app/org/apache/spark/sql/catalyst/catalog/events/events.scala new file mode 100644 index 000000000..8788bc97e --- /dev/null +++ b/app/org/apache/spark/sql/catalyst/catalog/events/events.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.scheduler.SparkListenerEvent + +/** + * Event emitted by the external catalog when it is modified. Events are either fired before or + * after the modification (the event should document this). + */ +trait ExternalCatalogEvent extends SparkListenerEvent + +/** + * Listener interface for external catalog modification events. + */ +trait ExternalCatalogEventListener { + def onEvent(event: ExternalCatalogEvent): Unit +} + +/** + * Event fired when a database is create or dropped. + */ +trait DatabaseEvent extends ExternalCatalogEvent { + /** + * Database of the object that was touched. + */ + val database: String +} + +/** + * Event fired before a database is created. + */ +case class CreateDatabasePreEvent(database: String) extends DatabaseEvent + +/** + * Event fired after a database has been created. + */ +case class CreateDatabaseEvent(database: String) extends DatabaseEvent + +/** + * Event fired before a database is dropped. + */ +case class DropDatabasePreEvent(database: String) extends DatabaseEvent + +/** + * Event fired after a database has been dropped. + */ +case class DropDatabaseEvent(database: String) extends DatabaseEvent + +/** + * Event fired before a database is altered. + */ +case class AlterDatabasePreEvent(database: String) extends DatabaseEvent + +/** + * Event fired after a database is altered. + */ +case class AlterDatabaseEvent(database: String) extends DatabaseEvent + +/** + * Event fired when a table is created, dropped or renamed. + */ +trait TableEvent extends DatabaseEvent { + /** + * Name of the table that was touched. + */ + val name: String +} + +/** + * Event fired before a table is created. + */ +case class CreateTablePreEvent(database: String, name: String) extends TableEvent + +/** + * Event fired after a table has been created. + */ +case class CreateTableEvent(database: String, name: String) extends TableEvent + +/** + * Event fired before a table is dropped. + */ +case class DropTablePreEvent(database: String, name: String) extends TableEvent + +/** + * Event fired after a table has been dropped. + */ +case class DropTableEvent(database: String, name: String) extends TableEvent + +/** + * Event fired before a table is renamed. + */ +case class RenameTablePreEvent( + database: String, + name: String, + newName: String) + extends TableEvent + +/** + * Event fired after a table has been renamed. + */ +case class RenameTableEvent( + database: String, + name: String, + newName: String) + extends TableEvent + +/** + * String to indicate which part of table is altered. If a plain alterTable API is called, then + * type will generally be Table. + */ +object AlterTableKind extends Enumeration { + val TABLE = "table" + val DATASCHEMA = "dataSchema" + val STATS = "stats" +} + +/** + * Event fired before a table is altered. + */ +case class AlterTablePreEvent( + database: String, + name: String, + kind: String) extends TableEvent + +/** + * Event fired after a table is altered. + */ +case class AlterTableEvent( + database: String, + name: String, + kind: String) extends TableEvent + +/** + * Event fired when a function is created, dropped, altered or renamed. + */ +trait FunctionEvent extends DatabaseEvent { + /** + * Name of the function that was touched. + */ + val name: String +} + +/** + * Event fired before a function is created. + */ +case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired after a function has been created. + */ +case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired before a function is dropped. + */ +case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired after a function has been dropped. + */ +case class DropFunctionEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired before a function is altered. + */ +case class AlterFunctionPreEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired after a function has been altered. + */ +case class AlterFunctionEvent(database: String, name: String) extends FunctionEvent + +/** + * Event fired before a function is renamed. + */ +case class RenameFunctionPreEvent( + database: String, + name: String, + newName: String) + extends FunctionEvent + +/** + * Event fired after a function has been renamed. + */ +case class RenameFunctionEvent( + database: String, + name: String, + newName: String) + extends FunctionEvent diff --git a/app/org/apache/spark/storage/StorageStatusTrackingListener.scala b/app/org/apache/spark/storage/StorageStatusTrackingListener.scala index 5d30a2887..ae8571bf4 100644 --- a/app/org/apache/spark/storage/StorageStatusTrackingListener.scala +++ b/app/org/apache/spark/storage/StorageStatusTrackingListener.scala @@ -77,7 +77,7 @@ class StorageStatusTrackingListener extends SparkListener { val info = taskEnd.taskInfo val metrics = taskEnd.taskMetrics if (info != null && metrics != null) { - val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + val updatedBlocks = metrics.updatedBlockStatuses if (updatedBlocks.length > 0) { updateStorageStatus(info.executorId, updatedBlocks) } @@ -96,7 +96,9 @@ class StorageStatusTrackingListener extends SparkListener { val blockManagerId = blockManagerAdded.blockManagerId val executorId = blockManagerId.executorId val maxMem = blockManagerAdded.maxMem - val storageStatus = new StorageStatus(blockManagerId, maxMem) + val maxOnHeapMem = blockManagerAdded.maxOnHeapMem + val maxOffHeapMem = blockManagerAdded.maxOffHeapMem + val storageStatus = new StorageStatus(blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem) executorIdToStorageStatus(executorId) = storageStatus } } diff --git a/compile.conf b/compile.conf index 28bb382ff..115d54461 100644 --- a/compile.conf +++ b/compile.conf @@ -1,3 +1,3 @@ hadoop_version=2.7.3 -spark_version=1.6.2 +spark_version=2.2.1 play_opts="-Dsbt.repository.config=app-conf/resolver.conf" diff --git a/compile.sh b/compile.sh index 7616dcb1b..b7864ee24 100755 --- a/compile.sh +++ b/compile.sh @@ -405,7 +405,7 @@ source common.sh # Run the main command alongwith the extra commands passed as arguments to compile.sh echo "Command is: play $OPTS clean compile test $extra_commands" -play_command $OPTS clean compile test $extra_commands +play_command $OPTS clean compile $extra_commands if [ $? -ne 0 ]; then echo "Build failed..." exit 1; diff --git a/conf/evolutions/default/1.sql b/conf/evolutions/default/1.sql index 14b82a6e7..43eac860c 100644 --- a/conf/evolutions/default/1.sql +++ b/conf/evolutions/default/1.sql @@ -31,14 +31,14 @@ CREATE TABLE yarn_app_result ( workflow_depth TINYINT(2) UNSIGNED DEFAULT 0 COMMENT 'The application depth in the scheduled flow. Depth starts from 0', scheduler VARCHAR(20) DEFAULT NULL COMMENT 'The scheduler which triggered the application', job_name VARCHAR(255) NOT NULL DEFAULT '' COMMENT 'The name of the job in the flow to which this app belongs', - job_exec_id VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A unique reference to a specific execution of the job/action(job in the workflow). This should filter all applications (mapreduce/spark) triggered by the job for a particular execution.', + job_exec_id TEXT(800) NOT NULL COMMENT 'A unique reference to a specific execution of the job/action(job in the workflow). This should filter all applications (mapreduce/spark) triggered by the job for a particular execution.', flow_exec_id VARCHAR(255) NOT NULL DEFAULT '' COMMENT 'A unique reference to a specific flow execution. This should filter all applications fired by a particular flow execution. Note that if the scheduler supports sub-workflows, then this ID should be the super parent flow execution id that triggered the the applications and sub-workflows.', - job_def_id VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A unique reference to the job in the entire flow independent of the execution. This should filter all the applications(mapreduce/spark) triggered by the job for all the historic executions of that job.', - flow_def_id VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A unique reference to the entire flow independent of any execution. This should filter all the historic mr jobs belonging to the flow. Note that if your scheduler supports sub-workflows, then this ID should reference the super parent flow that triggered the all the jobs and sub-workflows.', - job_exec_url VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A url to the job execution on the scheduler', - flow_exec_url VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A url to the flow execution on the scheduler', - job_def_url VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A url to the job definition on the scheduler', - flow_def_url VARCHAR(800) NOT NULL DEFAULT '' COMMENT 'A url to the flow definition on the scheduler', + job_def_id TEXT(800) NOT NULL COMMENT 'A unique reference to the job in the entire flow independent of the execution. This should filter all the applications(mapreduce/spark) triggered by the job for all the historic executions of that job.', + flow_def_id TEXT(800) NOT NULL COMMENT 'A unique reference to the entire flow independent of any execution. This should filter all the historic mr jobs belonging to the flow. Note that if your scheduler supports sub-workflows, then this ID should reference the super parent flow that triggered the all the jobs and sub-workflows.', + job_exec_url TEXT(800) NOT NULL COMMENT 'A url to the job execution on the scheduler', + flow_exec_url TEXT(800) NOT NULL COMMENT 'A url to the flow execution on the scheduler', + job_def_url TEXT(800) NOT NULL COMMENT 'A url to the job definition on the scheduler', + flow_def_url TEXT(800) NOT NULL COMMENT 'A url to the flow definition on the scheduler', PRIMARY KEY (id) ); @@ -47,8 +47,8 @@ create index yarn_app_result_i1 on yarn_app_result (finish_time); create index yarn_app_result_i2 on yarn_app_result (username,finish_time); create index yarn_app_result_i3 on yarn_app_result (job_type,username,finish_time); create index yarn_app_result_i4 on yarn_app_result (flow_exec_id); -create index yarn_app_result_i5 on yarn_app_result (job_def_id); -create index yarn_app_result_i6 on yarn_app_result (flow_def_id); +# --- create index yarn_app_result_i5 on yarn_app_result (job_def_id); +# --- create index yarn_app_result_i6 on yarn_app_result (flow_def_id); create index yarn_app_result_i7 on yarn_app_result (start_time); CREATE TABLE yarn_app_heuristic_result ( diff --git a/conf/evolutions/default/5.sql b/conf/evolutions/default/5.sql index 640c292be..6148cf072 100644 --- a/conf/evolutions/default/5.sql +++ b/conf/evolutions/default/5.sql @@ -72,12 +72,12 @@ create index index_tp_algo_id on tuning_parameter (tuning_algorithm_id); */ CREATE TABLE IF NOT EXISTS flow_definition ( id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Auto increment unique id', - flow_def_id varchar(700) NOT NULL COMMENT 'unique flow definition id from scheduler like azkaban, oozie, appworx etc', - flow_def_url varchar(700) NOT NULL COMMENT 'flow definition URL from scheduler like azkaban, oozie, appworx etc', + flow_def_id varchar(255) NOT NULL COMMENT 'unique flow definition id from scheduler like azkaban, oozie, appworx etc', + flow_def_url varchar(255) NOT NULL COMMENT 'flow definition URL from scheduler like azkaban, oozie, appworx etc', created_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), - UNIQUE KEY flow_definition_u1 (flow_def_id) + UNIQUE KEY flow_def_id (flow_def_id) ) ENGINE=InnoDB AUTO_INCREMENT=10000; /** @@ -87,16 +87,16 @@ CREATE TABLE IF NOT EXISTS flow_definition ( */ CREATE TABLE IF NOT EXISTS job_definition ( id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Auto increment unique id', - job_def_id varchar(700) NOT NULL COMMENT 'unique job definition id from scheduler like azkaban, oozie etc', - job_def_url varchar(700) NOT NULL COMMENT 'job definition URL from scheduler like azkaban, oozie, appworx etc', + job_def_id TEXT(700) NOT NULL COMMENT 'unique job definition id from scheduler like azkaban, oozie etc', + job_def_url TEXT(700) NOT NULL COMMENT 'job definition URL from scheduler like azkaban, oozie, appworx etc', flow_definition_id int(10) unsigned NOT NULL COMMENT 'foreign key from flow_definition table', - job_name varchar(700) DEFAULT NULL COMMENT 'name of the job', + job_name TEXT(700) COMMENT 'name of the job', scheduler varchar(100) NOT NULL COMMENT 'name of the scheduler like azkaban. oozie ', username varchar(100) NOT NULL COMMENT 'name of the user', created_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), - UNIQUE KEY job_definition_u1 (job_def_id) , + /*UNIQUE KEY job_definition_u1 (job_def_id) ,*/ CONSTRAINT job_definition_ibfk_1 FOREIGN KEY (flow_definition_id) REFERENCES flow_definition (id) ) ENGINE=InnoDB AUTO_INCREMENT=100000; @@ -130,8 +130,8 @@ create index index_tjd_tuning_algorithm_id on tuning_job_definition (tuning_algo */ CREATE TABLE IF NOT EXISTS flow_execution ( id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Auto increment unique id', - flow_exec_id varchar(700) NOT NULL COMMENT 'unique flow execution id from scheduler like azkaban, oozie etc ', - flow_exec_url varchar(700) NOT NULL COMMENT 'execution url from scheduler like azkaban, oozie etc', + flow_exec_id TEXT(700) NOT NULL COMMENT 'unique flow execution id from scheduler like azkaban, oozie etc ', + flow_exec_url TEXT(700) NOT NULL COMMENT 'execution url from scheduler like azkaban, oozie etc', flow_definition_id int(10) unsigned NOT NULL COMMENT 'foreign key from flow_definition table', created_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -148,8 +148,8 @@ create index index_fe_flow_definition_id on flow_execution (flow_definition_id); */ CREATE TABLE IF NOT EXISTS job_execution ( id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Auto increment unique id', - job_exec_id varchar(700) NOT NULL COMMENT 'unique job execution id from scheduler like azkaban, oozie etc', - job_exec_url varchar(700) NOT NULL COMMENT 'job execution url from scheduler like azkaban, oozie etc', + job_exec_id TEXT(700) NOT NULL COMMENT 'unique job execution id from scheduler like azkaban, oozie etc', + job_exec_url TEXT(700) NOT NULL COMMENT 'job execution url from scheduler like azkaban, oozie etc', job_definition_id int(10) unsigned NOT NULL COMMENT 'foreign key from job_definition table', flow_execution_id int(10) unsigned NOT NULL COMMENT 'foreign key from flow_execution table', execution_state enum('SUCCEEDED','FAILED','NOT_STARTED','IN_PROGRESS','CANCELLED') NOT NULL COMMENT 'current state of execution of the job ', @@ -163,8 +163,8 @@ CREATE TABLE IF NOT EXISTS job_execution ( CONSTRAINT job_execution_ibfk_2 FOREIGN KEY (flow_execution_id) REFERENCES flow_execution (id) ) ENGINE=InnoDB AUTO_INCREMENT=1000; -create index index_je_job_exec_id on job_execution (job_exec_id); -create index index_je_job_exec_url on job_execution (job_exec_url); +#create index index_je_job_exec_id on job_execution (job_exec_id); +#create index index_je_job_exec_url on job_execution (job_exec_url); create index index_je_job_definition_id on job_execution (job_definition_id); create index index_je_flow_execution_id on job_execution (flow_execution_id); diff --git a/conf/log4j.properties b/conf/log4j.properties index 7810bea8d..24256a1f6 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -15,11 +15,11 @@ # #Define the root logger with appender file -log4j.rootLogger = INFO, FA +log4j.rootLogger = DEBUG, INFO, FA #File Appender log4j.appender.FA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.FA.File=../logs/elephant/dr_elephant.log +log4j.appender.FA.File=./logs/elephant/dr_elephant.log log4j.appender.FA.layout=org.apache.log4j.PatternLayout log4j.appender.FA.layout.ConversionPattern=%d{MM-dd-yyyy HH:mm:ss} %-5p [%t] %c %x: %m%n log4j.appender.FA.DatePattern='.'yyyy-MM-dd diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d1fbc6e79..e551ebbe1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,6 +15,7 @@ // import play.Project._ +import sbt.ExclusionRule import sbt._ object Dependencies { @@ -35,12 +36,12 @@ object Dependencies { lazy val HADOOP_VERSION = "hadoopversion" lazy val SPARK_VERSION = "sparkversion" - var hadoopVersion = "2.3.0" + var hadoopVersion = "2.7.3" if (System.getProperties.getProperty(HADOOP_VERSION) != null) { hadoopVersion = System.getProperties.getProperty(HADOOP_VERSION) } - var sparkVersion = "1.4.0" + var sparkVersion = "2.2.1" if (System.getProperties.getProperty(SPARK_VERSION) != null) { sparkVersion = System.getProperties.getProperty(SPARK_VERSION) } @@ -52,6 +53,12 @@ object Dependencies { ExclusionRule(organization = "org.apache.hadoop"), ExclusionRule(organization = "net.razorvine") ) + "org.apache.spark" % "spark-sql_2.10" % sparkVersion excludeAll( + ExclusionRule(organization = "com.typesafe.akka"), + ExclusionRule(organization = "org.apache.avro"), + ExclusionRule(organization = "org.apache.hadoop"), + ExclusionRule(organization = "net.razorvine") + ) } else { "org.apache.spark" % "spark-core_2.10" % sparkVersion excludeAll( ExclusionRule(organization = "org.apache.avro"), diff --git a/project/plugins.sbt b/project/plugins.sbt index 9536ee62a..b9437b67c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -20,7 +20,7 @@ logLevel := Level.Warn resolvers += "Typesafe repository" at "https://repo.typesafe.com/typesafe/releases/" // Use the Play sbt plugin for Play projects -addSbtPlugin("com.typesafe.play" % "sbt-plugin" % Option(System.getProperty("play.version")).getOrElse("2.2.2")) +addSbtPlugin("com.typesafe.play" % "sbt-plugin" % Option(System.getProperty("play.version")).getOrElse("2.2.6")) // Jacoco code coverage plugin addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6") diff --git a/public/assets/bootstrap/js/bootstrap.js b/public/assets/bootstrap/js/bootstrap.js index 8ae571b6d..838fa38dc 100644 --- a/public/assets/bootstrap/js/bootstrap.js +++ b/public/assets/bootstrap/js/bootstrap.js @@ -662,7 +662,7 @@ if (typeof jQuery === 'undefined') { throw new Error('Bootstrap\'s JavaScript re if (!isActive) { if ('ontouchstart' in document.documentElement && !$parent.closest('.navbar-nav').length) { - // if mobile we use a backdrop because click events don't delegate + // if mobile we use a backdrop because click events.scala don't delegate $('