Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of code level optimization recommendation for Hive #618

Open
wants to merge 10 commits into
base: tuning_20190221
Choose a base branch
from
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: scala
sudo: true
jdk:
- oraclejdk8
- openjdk8
scala:
- 2.10.4 # should match version in build.sbt
python: "2.6"
Expand Down
81 changes: 68 additions & 13 deletions app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@
package com.linkedin.drelephant.analysis;

import com.linkedin.drelephant.ElephantContext;
import com.linkedin.drelephant.analysis.code.extractors.CodeExtractionFactory;
import com.linkedin.drelephant.analysis.code.util.CodeAnalyzerException;
import com.linkedin.drelephant.analysis.code.CodeExtractor;
import com.linkedin.drelephant.analysis.code.CodeOptimizer;
import com.linkedin.drelephant.analysis.code.dataset.JobCodeInfoDataSet;
import com.linkedin.drelephant.analysis.code.dataset.Script;
import com.linkedin.drelephant.analysis.code.extractors.AzkabanJarvisCodeExtractor;
import com.linkedin.drelephant.analysis.code.optimizers.CodeOptimizerFactory;
import com.linkedin.drelephant.analysis.code.util.Constant;
import com.linkedin.drelephant.analysis.code.util.Helper;
import com.linkedin.drelephant.exceptions.core.ExceptionFingerprintingRunner;
import com.linkedin.drelephant.util.InfoExtractor;
import com.linkedin.drelephant.util.Utils;
Expand All @@ -26,8 +36,10 @@
import models.AppHeuristicResult;
import models.AppHeuristicResultDetails;
import models.AppResult;
import models.TuningJobExecutionCodeRecommendation;
import org.apache.log4j.Logger;
import com.linkedin.drelephant.exceptions.util.Constant.*;
import org.apache.hadoop.conf.Configuration;


/**
Expand All @@ -38,7 +50,7 @@ public class AnalyticJob {
private static final Logger logger = Logger.getLogger(AnalyticJob.class);

private static final String UNKNOWN_JOB_TYPE = "Unknown"; // The default job type when the data matches nothing.
private static final int _RETRY_LIMIT = 3; // Number of times a job needs to be tried before going into second retry queue
private static final int _RETRY_LIMIT = 3; // Number of times a job needs to be tried before going into second retry queue
private static final int _SECOND_RETRY_LIMIT = 5; // Number of times a job needs to be tried before dropping
private static final String EXCLUDE_JOBTYPE = "exclude_jobtypes_filter"; // excluded Job Types for heuristic

Expand Down Expand Up @@ -268,7 +280,8 @@ public AppResult getAnalysis() throws Exception {
String confExcludedApps = heuristic.getHeuristicConfData().getParamMap().get(EXCLUDE_JOBTYPE);

if (confExcludedApps == null || confExcludedApps.length() == 0 ||
!Arrays.asList(confExcludedApps.split(",")).contains(jobTypeName)) {
!Arrays.asList(confExcludedApps.split(","))
.contains(jobTypeName)) {
HeuristicResult result = heuristic.apply(data);
if (result != null) {
analysisResults.add(result);
Expand Down Expand Up @@ -301,23 +314,25 @@ public AppResult getAnalysis() throws Exception {
Severity worstSeverity = Severity.NONE;
for (HeuristicResult heuristicResult : analysisResults) {
AppHeuristicResult detail = new AppHeuristicResult();
detail.heuristicClass = Utils.truncateField(heuristicResult.getHeuristicClassName(),
AppHeuristicResult.HEURISTIC_CLASS_LIMIT, getAppId());
detail.heuristicName = Utils.truncateField(heuristicResult.getHeuristicName(),
AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.heuristicClass =
Utils.truncateField(heuristicResult.getHeuristicClassName(), AppHeuristicResult.HEURISTIC_CLASS_LIMIT,
getAppId());
detail.heuristicName =
Utils.truncateField(heuristicResult.getHeuristicName(), AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.severity = heuristicResult.getSeverity();
detail.score = heuristicResult.getScore();

// Load Heuristic Details
for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
heuristicDetail.yarnAppHeuristicResult = detail;
heuristicDetail.name = Utils.truncateField(heuristicResultDetails.getName(),
AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value = Utils.truncateField(heuristicResultDetails.getValue(),
AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details = Utils.truncateField(heuristicResultDetails.getDetails(),
AppHeuristicResultDetails.DETAILS_LIMIT, getAppId());
heuristicDetail.name =
Utils.truncateField(heuristicResultDetails.getName(), AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value =
Utils.truncateField(heuristicResultDetails.getValue(), AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details =
Utils.truncateField(heuristicResultDetails.getDetails(), AppHeuristicResultDetails.DETAILS_LIMIT,
getAppId());
// This was added for AnalyticTest. Commenting this out to fix a bug. Also disabling AnalyticJobTest.
//detail.yarnAppHeuristicResultDetails = new ArrayList<AppHeuristicResultDetails>();
detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
Expand All @@ -331,6 +346,16 @@ public AppResult getAnalysis() throws Exception {

// Retrieve information from job configuration like scheduler information and store them into result.
InfoExtractor.loadInfo(result, data);

//Code Level optimization
try {
if (ElephantContext.instance().getAutoTuningConf().getBoolean(Constant.CODE_LEVEL_OPTIMIZATION_ENABLED, true)) {
codeLevelOptimization(result);
}
} catch (CodeAnalyzerException e) {
logger.error("Error in analysing the code for " + result.jobExecId, e);
}

/**
* Exception fingerprinting is applied (if required)
*/
Expand All @@ -341,6 +366,36 @@ public AppResult getAnalysis() throws Exception {
return result;
}

private void codeLevelOptimization(AppResult result) throws CodeAnalyzerException {
logger.info(" Code Analysis Process Started " + result.queueName);
Configuration configuration = ElephantContext.instance().getAutoTuningConf();
Helper.ConfigurationBuilder.buildConfigurations(configuration);
CodeExtractor codeExtractor = CodeExtractionFactory.getCodeExtractor(result);
JobCodeInfoDataSet jobCodeInfoDataSet = codeExtractor.execute(result);
if (jobCodeInfoDataSet!=null && jobCodeInfoDataSet.getCodeOptimizer() != null) {
logger.info(" Optimizer is available for the code , hence optimizing it " + jobCodeInfoDataSet);
Script script = jobCodeInfoDataSet.getCodeOptimizer().execute(jobCodeInfoDataSet.getSourceCode());
saveOptimizationResultIntoDB(script, jobCodeInfoDataSet, result);
}
}

private void saveOptimizationResultIntoDB(Script script, JobCodeInfoDataSet jobCodeInfoDataSet, AppResult result) {
if (script != null && script.getOptimizationComment().length() > 0) {
TuningJobExecutionCodeRecommendation tuningJobExecutionCodeRecommendation =
new TuningJobExecutionCodeRecommendation();
tuningJobExecutionCodeRecommendation.jobDefId = result.jobDefId;
tuningJobExecutionCodeRecommendation.jobExecUrl = result.jobExecUrl;
tuningJobExecutionCodeRecommendation.codeLocation =
"SCM=" + jobCodeInfoDataSet.getScmType() + ",RepoName=" + jobCodeInfoDataSet.getRepoName() + ",FileName="
+ jobCodeInfoDataSet.getFileName();

tuningJobExecutionCodeRecommendation.recommendation = script.getOptimizationComment().toString();
logger.info(" Severity of the script is " + jobCodeInfoDataSet.getCodeOptimizer().getSeverity());
tuningJobExecutionCodeRecommendation.severity = jobCodeInfoDataSet.getCodeOptimizer().getSeverity();
tuningJobExecutionCodeRecommendation.save();
}
}

/**
*
* @param result
Expand Down Expand Up @@ -372,7 +427,7 @@ public boolean applyExceptionFingerprinting(AppResult result, HadoopApplicationD
*
* @return true if should retry, else false
*/
public boolean isSecondPhaseRetry(){
public boolean isSecondPhaseRetry() {
return (_secondRetries++) < _SECOND_RETRY_LIMIT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final Logger logger = Logger.getLogger(AnalyticJobGeneratorHadoop2.class);
boolean debugEnabled = logger.isDebugEnabled();
private boolean debugEnabled = logger.isDebugEnabled();
private static final String RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address";
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
Expand Down Expand Up @@ -259,6 +259,7 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
String amHostHttpAddress = app.get("amHostHttpAddress").getValueAsText();
String jobState = app.get("state").getValueAsText();


if (debugEnabled) {
logger.debug(" AM Container logs URL " + amContainerLogsURL);
logger.debug(" AM Host HTTP Address " + amHostHttpAddress);
Expand All @@ -268,9 +269,8 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
ApplicationType type =
ElephantContext.instance().getApplicationTypeForName(app.get("applicationType").getValueAsText());


AnalyticJob analyticJob = new AnalyticJob();
logger.info(" Analysis job " + analyticJob.getTrackingUrl());
// logger.info(" Analysis job " + analyticJob.getTrackingUrl());
analyticJob.setAppId(appId)
.setAppType(type)
.setUser(user)
Expand All @@ -284,7 +284,6 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
.setAmHostHttpAddress(amHostHttpAddress)
.setState(jobState);
appList.add(analyticJob);

}
}
return appList;
Expand Down
68 changes: 68 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/CodeExtractor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis.code;

import com.linkedin.drelephant.analysis.code.dataset.JobCodeInfoDataSet;
import com.linkedin.drelephant.analysis.code.util.CodeAnalyzerException;
import java.io.IOException;
import java.net.MalformedURLException;
import models.AppResult;
import org.codehaus.jettison.json.JSONException;


/**
* Interface for CodeExtractor , one of the implementation is
* AzkabanJarvisCodeExtractor
*/
public interface CodeExtractor {

/**
* Check if prerequistes are matched to extract the codee
* @param appResult : Information about the application executed on cluster
* @return true or false
*/
boolean arePrerequisiteMatched(AppResult appResult);

/**
*
* @param appResult : Information about the application executed on cluster
* @return : Code File Name
* @throws CodeAnalyzerException : Throws custom exception
*/
String getCodeFileName(AppResult appResult) throws CodeAnalyzerException;

/**
*
* @param appResult: Information about the application executed on cluster
* @return : DataSet which contains source code and all the relevant information about code
* @throws CodeAnalyzerException : Throws custom exception
*/
JobCodeInfoDataSet execute(AppResult appResult) throws CodeAnalyzerException;

/**
*
* @param codeInformation : Contains code Information
* @throws IOException
* @throws JSONException
*/
void processCodeLocationInformation(String codeInformation) throws IOException, JSONException;

/**
* Get JobCodeInfoDataSet ,contains information about JobCode
*/
JobCodeInfoDataSet getJobCodeInfoDataSet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis.code;

import com.linkedin.drelephant.analysis.code.dataset.Script;
import com.linkedin.drelephant.analysis.code.util.CodeAnalyzerException;


/**
* This interface defines the methods for describing optimization rule.
* Optimization rules are the rules which will scan the script and
* find possible optimization in the script. They should also tell
* how important<Severity> its to take this optimization into consideration
*/
public interface CodeOptimizationRule {
/**
* This is the main method which will execute the rule .
* @param script : Script , which contains framework parsable Code
* @throws CodeAnalyzerException : It can throw CodeAnalyzerException
*/
void processRule(Script script) throws CodeAnalyzerException;

/**
* Every rule must have name .
* @return Rule Name
*/
String getRuleName();

/**
* After anaylzing the script , rule provides severity (how important its to fix the script
* as per the rule)
* @return
*/
String getSeverity();
}
59 changes: 59 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/CodeOptimizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.drelephant.analysis.code;

import com.linkedin.drelephant.analysis.code.dataset.Code;
import com.linkedin.drelephant.analysis.code.dataset.Script;
import java.util.List;


/**
* Code Optimizier Interface , one of the concrete implementation
* is HiveCodeOptimizer
*/
public interface CodeOptimizer {
/**
*
* @param sourceCode : Give the sourceCode and convert it into Code (which is List<Statement></Statement>)
* @return Return Code : Format on which optimizations can be applied.
*/
Code generateCode(String sourceCode);

/**
* Return the Severity of the code, High severity means , it should be fix
* immediately
* @return
*/
String getSeverity();

/**
* This is the brain of the optimization . It will do the optimization of the code.
* @param sourceScript : Source Code
* @param rules : Rules to apply to optimize the code.
* @return
*/

void optimizationEngine(Script sourceScript, List<CodeOptimizationRule> rules);

/**
*
* @param sourceCode Souce code of the script
* @return : Script , which contains optimization comments and also Code.
*/

Script execute(String sourceCode);
}

Loading