From 7347887fd24d2a29faf0f2ca2653001e9469d6c5 Mon Sep 17 00:00:00 2001 From: Gabriel Iuhasz Date: Thu, 27 Jul 2017 12:39:15 +0300 Subject: [PATCH] added grid search and index checking --- adpengine/dmonadpengine.py | 17 +++++-- dataformatter.py | 5 +- dmonadp_test.ini => dmonadp.ini | 61 +++++++++++++++++++----- dmonconnector.py | 5 +- dmonscikit/dmonscilearnclassification.py | 33 +++++++++++-- requirement.txt | 6 +-- 6 files changed, 101 insertions(+), 26 deletions(-) rename dmonadp_test.ini => dmonadp.ini (54%) diff --git a/adpengine/dmonadpengine.py b/adpengine/dmonadpengine.py index 67ea4d1..7743766 100644 --- a/adpengine/dmonadpengine.py +++ b/adpengine/dmonadpengine.py @@ -540,6 +540,8 @@ def filterData(self, df, m=False): df = self.dformat.dropColumns(df, cfilterparse(self.dfilter)) # self.dformat.fillMissing(df) # Check for user defined categorical features + if df.index.name is None: + df.set_index('key', inplace=True) if self.categorical == 0: logger.info('[%s] : [INFO] Skipping categorical feature conversion', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')) @@ -570,6 +572,8 @@ def trainMethod(self): udata = self.dformat.toDF(os.path.join(self.dataDir, 'query_response.csv')) elif 'cep' in queryd: udata = self.dformat.toDF(os.path.join(self.dataDir, 'CEP.csv')) + elif 'system' in queryd: + udata = self.dformat.toDF(os.path.join(self.dataDir, 'System.csv')) else: if 'yarn' in queryd: udata = yarnReturn @@ -585,6 +589,8 @@ def trainMethod(self): udata = userqueryReturn elif 'cep' in queryd: udata = cepQueryReturn + elif 'system' in queryd: + udata = systemReturn udata = self.filterData(udata) #todo check if self.type == 'clustering': if self.method in self.allowedMethodsClustering: @@ -931,13 +937,13 @@ def detectAnomalies(self): data.set_index('key', inplace=True) data = self.filterData(data) elif 'cep' in queryd: - cepQueryReturn = self.filterData(cepQueryReturn) + # cepQueryReturn = self.filterData(cepQueryReturn) if checkpoint: data = cepQueryReturn else: dataf = os.path.join(self.dataDir, 'CEP.csv') - data.set_index('key', inplace=True) data = self.dformat.toDF(dataf) + data.set_index('key', inplace=True) data = self.filterData(data) if self.method in self.allowefMethodsClassification: print "Detecting with selected method %s of type %s" % (self.method, self.type) @@ -1616,15 +1622,16 @@ def getCEP(self, detect=False): datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), type(inst), inst.args) sys.exit(1) if not dCepArray: - print "empty" - print gcep + print "CEP response is empty! Exiting ...." + logger.error('[%s] : [WARN] CEP response is empty!', + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')) sys.exit(1) df = self.dformat.dtoDF(dCepArray) if not checkpoint: self.dformat.df2csv(df, os.path.join(self.dataDir, cep_file)) returnCEP = 0 else: - df.set_index('key', inplace=True) + # df.set_index('key', inplace=True) returnCEP = df print "Querying CEP metrics complete" logger.info('[%s] : [INFO] Querying CEP metrics complete', diff --git a/dataformatter.py b/dataformatter.py index 3a14b1c..6804b31 100644 --- a/dataformatter.py +++ b/dataformatter.py @@ -612,7 +612,10 @@ def ohEncoding(self, data, cols=None, replace=True): cols = [] for el, v in data.dtypes.iteritems(): if v == 'object': - cols.append(el) + if el == 'key': + pass + else: + cols.append(el) logger.info('[%s] : [INFO] Categorical features not set, detected as categorical: %s', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), str(cols)) print "Categorical features not set, detected as categorical: %s" % str(cols) diff --git a/dmonadp_test.ini b/dmonadp.ini similarity index 54% rename from dmonadp_test.ini rename to dmonadp.ini index 1d7fb4a..1eb9a5f 100644 --- a/dmonadp_test.ini +++ b/dmonadp.ini @@ -28,23 +28,60 @@ Detect:false #Rows:ld:145607979;gd:145607979 #DColumns:colname;colname2;colname3 - [Detect] -Method:isoforest -Type:clustering -Export:test3 -Load:test2 +Method:randomforest +Type:classification +Export:cep1 +Load:cep2 +validratio:0.3 +compare:true + -;IsolationForest +;RandomForest [MethodSettings] -n_estimators:100 -max_samples:100 -contamination:0.01 -bootstrap:False -max_features:1.0 +n_estimators:10 +criterion:gini +max_features:auto +max_depth:None +min_sample_split:2 +min_sample_leaf:1 +min_weight_faction_leaf:0 +bootstrap:True n_jobs:1 random_state:None verbose:0 +iso_n_estimators:100 +iso_max_samples:auto +iso_contamination:0.1 +iso_bootstrap:True +iso_max_features:1.0 +iso_n_jobs:1 +iso_random_state:None +iso_verbose:0 +db_eps:0.8 +min_samples:10 +db_metric:euclidean +db_algorithm:auto +db_leaf_size:30 +db_p:0.2 +db_n_jobs:1 + +;[Detect] +;Method:isoforest +;Type:clustering +;Export:test3 +;Load:test2 +; +;;IsolationForest +;[MethodSettings] +;n_estimators:100 +;max_samples:100 +;contamination:0.01 +;bootstrap:False +;max_features:1.0 +;n_jobs:1 +;random_state:None +;verbose:0 ;SDBSCAN ;[MethodSettings] @@ -70,7 +107,7 @@ Network: tx:gd:34344;rx:ld:323434 [Misc] heap:512m -checkpoint:false +checkpoint:true delay:15s interval:30m resetindex:false \ No newline at end of file diff --git a/dmonconnector.py b/dmonconnector.py index dd7c679..088de31 100644 --- a/dmonconnector.py +++ b/dmonconnector.py @@ -38,7 +38,7 @@ def __init__(self, esEndpoint, dmonPort=5001, esInstanceEndpoint=9200, index="lo self.myIndex = index def query(self, queryBody, allm=True, dMetrics=[], debug=False): - res = self.esInstance.search(index=self.myIndex, body=queryBody) + res = self.esInstance.search(index=self.myIndex, body=queryBody, request_timeout=230) if debug == True: print "%---------------------------------------------------------%" print "Raw JSON Ouput" @@ -210,8 +210,9 @@ def getInterval(self): return rData def aggQuery(self, queryBody): + adt_timeout = os.environ['ADP_TIMEOUT'] = os.getenv('ADP_TIMEOUT', str(60)) # Set timeout as env variable ADT_TIMEOUT, if not set use default 60 try: - res = self.esInstance.search(index=self.myIndex, body=queryBody) + res = self.esInstance.search(index=self.myIndex, body=queryBody, request_timeout=float(adt_timeout)) except Exception as inst: logger.error('[%s] : [ERROR] Exception while executing ES query with %s and %s', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), type(inst), inst.args) sys.exit(2) diff --git a/dmonscikit/dmonscilearnclassification.py b/dmonscikit/dmonscilearnclassification.py index 48b596b..f0abb62 100644 --- a/dmonscikit/dmonscilearnclassification.py +++ b/dmonscikit/dmonscilearnclassification.py @@ -9,9 +9,11 @@ from sklearn.ensemble import IsolationForest from sklearn.ensemble import RandomForestClassifier from sklearn.tree import DecisionTreeClassifier, export_graphviz +from sklearn.neural_network import MLPClassifier from sklearn import model_selection from sklearn.ensemble import AdaBoostClassifier from sklearn.preprocessing import StandardScaler +from sklearn.grid_search import GridSearchCV from sklearn.cluster import DBSCAN import cPickle as pickle from util import str2Bool @@ -165,7 +167,7 @@ def decisionTree(self, settings, data=None, dropna=True): min_weight_fraction_leaf=float(settings["min_weight_faction_leaf"]), random_state=settings["random_state"]) if self.validratio: trainSize = 1.0 - self.validratio - print "Random forest training to validation ratio set to: %s" % str(self.validratio) + print "Decision Tree training to validation ratio set to: %s" % str(self.validratio) logger.info('[%s] : [INFO] Random forest training to validation ratio set to: %s', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), str(self.validratio)) d_train, d_test, f_train, f_test = self.__dataSplit(X, y, testSize=self.validratio, trainSize=trainSize) @@ -401,6 +403,7 @@ def trainingDataGen(self, settings, data=None, dropna=True, onlyAno=True): logger.info('[%s] : [INFO] Starting training data generation ...', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')) df = self.__loadData(data, dropna) + print df.index.name if df.index.name is None: df.set_index('key', inplace=True) logger.info('[%s] : [INFO] Input Dataframe shape: %s', @@ -557,10 +560,17 @@ def trainingDataGen(self, settings, data=None, dropna=True, onlyAno=True): # print data_labeled[['Target', 'Target2']] # initialize empty column df['TargetF'] = np.nan - + print df.shape + print anomalyFrame.shape # add clustered anomalies to original dataframe for k in anomalyFrame.index.values: - df.set_value(k, 'TargetF', anomalyFrame.loc[k, 'Target2']) + try: + df.set_value(k, 'TargetF', anomalyFrame.loc[k, 'Target2']) + except Exception as inst: + print inst.args + print type(inst) + print k + sys.exit() # Mark all normal instances as 0 df = df.fillna(0) if df.isnull().values.any(): @@ -591,10 +601,27 @@ def __loadData(self, data=None, dropna=True): datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), str(type(data))) sys.exit(1) df = data + if df.index.name is None: + df.set_index('key', inplace=True) if dropna: df = df.dropna() return df + def __gridSearch(self, est, X, y): + if isinstance(est, RandomForestClassifier): + param_grid = { + 'n_estimators': [200, 300, 400, 500, 800, 1000], + 'max_features': ['auto', 'sqrt', 'log2'], + 'max_depth': [5, 15, 25] + } + + CV_rfc = GridSearchCV(estimator=est, param_grid=param_grid, cv=5) + CV_rfc.fit(X, y) + logger.info('[%s] : [INFO] Best parameters are: %s', + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), CV_rfc.best_params_) + print 'Best parameters are: %s' % CV_rfc.best_params_ + return CV_rfc.best_params_ + def __loadClassificationModel(self, method, model): ''' :param method: -> method name diff --git a/requirement.txt b/requirement.txt index 7439777..d48241a 100644 --- a/requirement.txt +++ b/requirement.txt @@ -14,7 +14,7 @@ Flask-Mail==0.9.0 Flask-Migrate==1.2.0 Flask-Moment==0.3.3 Flask-OpenID==1.2.4 -flask-rdf==0.1.6 +flask-rdf Flask-RESTful==0.3.4 flask-restful-swagger==0.19 flask-restplus==0.7.2 @@ -36,7 +36,7 @@ iso8601==0.1.11 isodate==0.5.1 itsdangerous==0.24 javabridge==1.0.14 -jedi===0.8.1-final0 +jedi==0.8.1-final Jinja2==2.8 jsonpatch==1.13 jsonpointer==1.10 @@ -51,7 +51,7 @@ msgpack-python==0.4.7 netaddr==0.7.18 netifaces==0.10.4 networkx==1.10 -numpy==1.11.0 +numpy openopt==0.5308 os-client-config==1.16.0 oslo.config==1.9.3