Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of code level optimization recommendation for Hive #618

Open
wants to merge 10 commits into
base: tuning_20190221
Choose a base branch
from
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void loadAnalyticJobGenerator() {
}

try {
_analyticJobGenerator.configure(ElephantContext.instance().getGeneralConf());
_analyticJobGenerator.configure(ElephantContext.instance().getGeneralConf());
} catch (Exception e) {
logger.error("Error occurred when configuring the analysis provider.", e);
throw new RuntimeException(e);
Expand Down
72 changes: 57 additions & 15 deletions app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
package com.linkedin.drelephant.analysis;

import com.linkedin.drelephant.ElephantContext;
import com.linkedin.drelephant.analysis.code.CodeAnalyzerException;
import com.linkedin.drelephant.analysis.code.CodeExtractor;
import com.linkedin.drelephant.analysis.code.CodeOptimizer;
import com.linkedin.drelephant.analysis.code.JobCodeInfoDataSet;
import com.linkedin.drelephant.analysis.code.Script;
import com.linkedin.drelephant.analysis.code.impl.AzkabanJarvisCodeExtractor;
import com.linkedin.drelephant.analysis.code.impl.CodeOptimizerFactory;
import com.linkedin.drelephant.exceptions.core.ExceptionFingerprintingRunner;
import com.linkedin.drelephant.util.InfoExtractor;
import com.linkedin.drelephant.util.Utils;
Expand All @@ -26,6 +33,7 @@
import models.AppHeuristicResult;
import models.AppHeuristicResultDetails;
import models.AppResult;
import models.TuningJobExecutionCodeRecommendation;
import org.apache.log4j.Logger;
import com.linkedin.drelephant.exceptions.util.Constant.*;

Expand All @@ -38,7 +46,8 @@ public class AnalyticJob {
private static final Logger logger = Logger.getLogger(AnalyticJob.class);

private static final String UNKNOWN_JOB_TYPE = "Unknown"; // The default job type when the data matches nothing.
private static final int _RETRY_LIMIT = 3; // Number of times a job needs to be tried before going into second retry queue
private static final int _RETRY_LIMIT = 3;
// Number of times a job needs to be tried before going into second retry queue
private static final int _SECOND_RETRY_LIMIT = 5; // Number of times a job needs to be tried before dropping
private static final String EXCLUDE_JOBTYPE = "exclude_jobtypes_filter"; // excluded Job Types for heuristic

Expand Down Expand Up @@ -267,8 +276,8 @@ public AppResult getAnalysis() throws Exception {
for (Heuristic heuristic : heuristics) {
String confExcludedApps = heuristic.getHeuristicConfData().getParamMap().get(EXCLUDE_JOBTYPE);

if (confExcludedApps == null || confExcludedApps.length() == 0 ||
!Arrays.asList(confExcludedApps.split(",")).contains(jobTypeName)) {
if (confExcludedApps == null || confExcludedApps.length() == 0 || !Arrays.asList(confExcludedApps.split(","))
.contains(jobTypeName)) {
HeuristicResult result = heuristic.apply(data);
if (result != null) {
analysisResults.add(result);
Expand All @@ -277,7 +286,8 @@ public AppResult getAnalysis() throws Exception {
}
}

HadoopMetricsAggregator hadoopMetricsAggregator = ElephantContext.instance().getAggregatorForApplicationType(getAppType());
HadoopMetricsAggregator hadoopMetricsAggregator =
ElephantContext.instance().getAggregatorForApplicationType(getAppType());
hadoopMetricsAggregator.aggregate(data);
HadoopAggregatedData hadoopAggregatedData = hadoopMetricsAggregator.getResult();

Expand All @@ -301,23 +311,25 @@ public AppResult getAnalysis() throws Exception {
Severity worstSeverity = Severity.NONE;
for (HeuristicResult heuristicResult : analysisResults) {
AppHeuristicResult detail = new AppHeuristicResult();
detail.heuristicClass = Utils.truncateField(heuristicResult.getHeuristicClassName(),
AppHeuristicResult.HEURISTIC_CLASS_LIMIT, getAppId());
detail.heuristicName = Utils.truncateField(heuristicResult.getHeuristicName(),
AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.heuristicClass =
Utils.truncateField(heuristicResult.getHeuristicClassName(), AppHeuristicResult.HEURISTIC_CLASS_LIMIT,
getAppId());
detail.heuristicName =
Utils.truncateField(heuristicResult.getHeuristicName(), AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.severity = heuristicResult.getSeverity();
detail.score = heuristicResult.getScore();

// Load Heuristic Details
for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
heuristicDetail.yarnAppHeuristicResult = detail;
heuristicDetail.name = Utils.truncateField(heuristicResultDetails.getName(),
AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value = Utils.truncateField(heuristicResultDetails.getValue(),
AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details = Utils.truncateField(heuristicResultDetails.getDetails(),
AppHeuristicResultDetails.DETAILS_LIMIT, getAppId());
heuristicDetail.name =
Utils.truncateField(heuristicResultDetails.getName(), AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value =
Utils.truncateField(heuristicResultDetails.getValue(), AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details =
Utils.truncateField(heuristicResultDetails.getDetails(), AppHeuristicResultDetails.DETAILS_LIMIT,
getAppId());
// This was added for AnalyticTest. Commenting this out to fix a bug. Also disabling AnalyticJobTest.
//detail.yarnAppHeuristicResultDetails = new ArrayList<AppHeuristicResultDetails>();
detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
Expand All @@ -331,6 +343,36 @@ public AppResult getAnalysis() throws Exception {

// Retrieve information from job configuration like scheduler information and store them into result.
InfoExtractor.loadInfo(result, data);
if (result.queueName.toLowerCase().equals("ump_normal") || result.queueName.toLowerCase().equals("ump_hp")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving out internal queue configuration. At a minimum make this configurable. Better model the conditional execution as an interface? Applies elsewhere too

logger.info("UMP Job and hence finding path");
try {
CodeExtractor codeExtractor = new AzkabanJarvisCodeExtractor();
JobCodeInfoDataSet dataJob = codeExtractor.execute(result);
if (dataJob != null && dataJob.getSourceCode() != null) {
CodeOptimizer codeOptimizer = CodeOptimizerFactory.getCodeOptimizer(dataJob.getFileName());
if (codeOptimizer != null) {
Script script = codeOptimizer.execute(dataJob.getSourceCode());
if (script.getOptimizationComment().length() > 0) {
TuningJobExecutionCodeRecommendation tuningJobExecutionCodeRecommendation =
new TuningJobExecutionCodeRecommendation();
tuningJobExecutionCodeRecommendation.jobDefId = result.jobDefId;
tuningJobExecutionCodeRecommendation.jobExecUrl = result.jobExecUrl;
tuningJobExecutionCodeRecommendation.codeLocation =
"SCM=" + dataJob.getScmType() + ",RepoName=" + dataJob.getRepoName() + ",FileName="
+ dataJob.getFileName();

tuningJobExecutionCodeRecommendation.recommendation = script.getOptimizationComment().toString();
logger.info(" Severity of the script is " + codeOptimizer.getSeverity());
tuningJobExecutionCodeRecommendation.severity = codeOptimizer.getSeverity();
tuningJobExecutionCodeRecommendation.save();
}
}
}
} catch (CodeAnalyzerException e) {
logger.error("Error comes while extracting code ", e);
}
}

/**
* Exception fingerprinting is applied (if required)
*/
Expand Down Expand Up @@ -372,7 +414,7 @@ public boolean applyExceptionFingerprinting(AppResult result, HadoopApplicationD
*
* @return true if should retry, else false
*/
public boolean isSecondPhaseRetry(){
public boolean isSecondPhaseRetry() {
return (_secondRetries++) < _SECOND_RETRY_LIMIT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";
private static final String RM_APPLICATION_FILTER = "rm_application_filter";

private static Configuration configuration;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void updateResourceManagerAddresses() {
if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS);
if (resourceManagers != null) {
logger.info("The list of RM IDs are " + resourceManagers);
logger.info("The list of RM IDs are " + resourceManagers);
List<String> ids = Arrays.asList(resourceManagers.split(","));
_currentTime = System.currentTimeMillis();
updateAuthToken();
Expand All @@ -104,6 +104,7 @@ public void updateResourceManagerAddresses() {
}
} else {
_resourceManagerAddress = configuration.get(RESOURCE_MANAGER_ADDRESS);
logger.info(" Resource manager address test "+_resourceManagerAddress);
}
if (_resourceManagerAddress == null) {
throw new RuntimeException(
Expand All @@ -115,6 +116,7 @@ public void updateResourceManagerAddresses() {
@Override
public void configure(Configuration configuration)
throws IOException {

this.configuration = configuration;
this.elephantProperties = ElephantContext.instance().getElephnatConf();
String initialFetchWindowString = configuration.get(FETCH_INITIAL_WINDOW_MS);
Expand Down Expand Up @@ -151,6 +153,7 @@ public List<AnalyticJob> fetchAnalyticJobs()
}

// Fetch all succeeded apps
logger.info(" Resource Manager address "+_resourceManagerAddress);
URL succeededAppsURL =
new URL(new URL("http://" + _resourceManagerAddress), String.format(
"/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s&" + rmApplicationFilter,
Expand Down Expand Up @@ -258,6 +261,7 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
String amContainerLogsURL = app.get("amContainerLogs").getValueAsText();
String amHostHttpAddress = app.get("amHostHttpAddress").getValueAsText();
String jobState = app.get("state").getValueAsText();
String projectName = app.get("applicationTags").getTextValue();

if (debugEnabled) {
logger.debug(" AM Container logs URL " + amContainerLogsURL);
Expand All @@ -269,8 +273,10 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
ElephantContext.instance().getApplicationTypeForName(app.get("applicationType").getValueAsText());


if(queueName.toLowerCase().equals("ump_normal") || queueName.toLowerCase().equals("ump_hp")){
//if (type != null && (projectName.contains("test_autotuning") || projectName.toUpperCase().contains("HBP_PIG_V1"))) {
AnalyticJob analyticJob = new AnalyticJob();
logger.info(" Analysis job " + analyticJob.getTrackingUrl());
// logger.info(" Analysis job " + analyticJob.getTrackingUrl());
analyticJob.setAppId(appId)
.setAppType(type)
.setUser(user)
Expand All @@ -284,7 +290,7 @@ private List<AnalyticJob> readApps(URL url, boolean isSucceeded) throws IOExcept
.setAmHostHttpAddress(amHostHttpAddress)
.setState(jobState);
appList.add(analyticJob);

}
}
}
return appList;
Expand Down
25 changes: 25 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/Code.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.drelephant.analysis.code;

import java.util.ArrayList;
import java.util.List;


public class Code {
private List<Statement> statements = null;
public Code(){
statements = new ArrayList<>();
}

public List<Statement> getStatements() {
return statements;
}

public void setStatements(List<Statement> statements) {
this.statements = statements;
}

@Override
public String toString() {
return "Code{" + "statements=" + statements + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.drelephant.analysis.code;

import java.io.IOException;
import org.apache.log4j.Logger;
import org.codehaus.jettison.json.JSONException;


/**
* This is wrapper for exceptions
*/
public class CodeAnalyzerException extends Exception {
private static final Logger logger = Logger.getLogger(CodeAnalyzerException.class);

public CodeAnalyzerException(IOException exception) {
super(exception);
logger.error("Unable to get code ", exception);
}

public CodeAnalyzerException(JSONException exception) {
super(exception);
logger.error("Unable to parse JSON ", exception);
}

public CodeAnalyzerException(Exception exception) {
super(exception);
logger.error("Unknown exception have come , catching this , to not halt the system because of unknown exception ", exception);
}
}
17 changes: 17 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/CodeExtractor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.drelephant.analysis.code;

import java.io.IOException;
import java.net.MalformedURLException;
import models.AppResult;
import org.codehaus.jettison.json.JSONException;


public interface CodeExtractor {
boolean isCodeNameExtractible(AppResult appResult);

String codeFileNameExtractor(AppResult appResult) throws MalformedURLException;

JobCodeInfoDataSet codeInfoExtractor(String codeFileName) throws IOException, JSONException;

JobCodeInfoDataSet execute(AppResult appResult) throws CodeAnalyzerException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.drelephant.analysis.code;

import com.linkedin.drelephant.analysis.Severity;


public interface CodeOptimizationRule {
void processRule(Script script);
String getRuleName();
String getSeverity();
}
10 changes: 10 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/CodeOptimizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.drelephant.analysis.code;

import com.linkedin.drelephant.analysis.Severity;


public interface CodeOptimizer {
Code generateCode (String inputFileName);
Script execute(String inputFileName);
String getSeverity();
}
83 changes: 83 additions & 0 deletions app/com/linkedin/drelephant/analysis/code/JobCodeInfoDataSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.drelephant.analysis.code;

public class JobCodeInfoDataSet {
private String jobExecutionID = null;
private String jobName = null;
private String flowName = null;
private String projectName = null;
private String fileName = null;
private String sourceCode = null;
private String scmType = null;
private String repoName = null;

public String getRepoName() {
return repoName;
}

public void setRepoName(String repoName) {
this.repoName = repoName;
}

public String getScmType() {
return scmType;
}

public void setScmType(String scmType) {
this.scmType = scmType;
}

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getFlowName() {
return flowName;
}

public void setFlowName(String flowName) {
this.flowName = flowName;
}

public String getProjectName() {
return projectName;
}

public void setProjectName(String projectName) {
this.projectName = projectName;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName.replaceAll("src/","");
}

public String getSourceCode() {
return sourceCode;
}

public void setSourceCode(String sourceCode) {
this.sourceCode = sourceCode;
}

public String getJobExecutionID() {
return jobExecutionID;
}

public void setJobExecutionID(String jobExecutionID) {
this.jobExecutionID = jobExecutionID;
}

@Override
public String toString() {
return "JobCodeInfoDataSet{" + "jobExecutionID='" + jobExecutionID + '\'' + ", jobName='" + jobName + '\''
+ ", flowName='" + flowName + '\'' + ", projectName='" + projectName + '\'' + ", fileName='" + fileName + '\''
+ ", scmType='" + scmType + '\'' + '}';
}
}
Loading