From 58867178f02210191a68f4c2bf966341027dd708 Mon Sep 17 00:00:00 2001 From: Gabriel Iuhasz Date: Sun, 25 Jun 2017 10:07:47 +0300 Subject: [PATCH] added detect method to sciclassification --- adpengine/dmonadpengine.py | 74 +++++++++++++++++++++++- dmonadp.ini => dmonadp_test.ini | 26 +++++---- dmonscikit/dmonscilearnclassification.py | 19 +++--- 3 files changed, 97 insertions(+), 22 deletions(-) rename dmonadp.ini => dmonadp_test.ini (83%) diff --git a/adpengine/dmonadpengine.py b/adpengine/dmonadpengine.py index d39f0c5..67ea4d1 100644 --- a/adpengine/dmonadpengine.py +++ b/adpengine/dmonadpengine.py @@ -901,8 +901,78 @@ def detectAnomalies(self): self.type) sys.exit(1) elif self.type == 'classification': - print "Not yet supported!" # TODO - sys.exit(0) + while True: + print "Collect data ..." + systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userQueryReturn, cepQueryReturn = self.getData( + detect=True) + if 'yarn' in queryd: + # yarnReturn = self.filterData(yarnReturn) # todo + if checkpoint: + data = yarnReturn + else: + dataf = os.path.join(self.dataDir, 'Final_Merge.csv') + data = self.dformat.toDF(dataf) + data.set_index('key', inplace=True) + data = self.filterData(data) + elif 'storm' in queryd: + if checkpoint: + data = stormReturn + else: + dataf = os.path.join(self.dataDir, 'Storm.csv') + data = self.dformat.toDF(dataf) + data.set_index('key', inplace=True) + data = self.filterData(data) + elif 'userquery' in queryd: + if checkpoint: + data = userQueryReturn + else: + dataf = os.path.join(self.dataDir, 'query_response.csv') + data = self.dformat.toDF(dataf) + data.set_index('key', inplace=True) + data = self.filterData(data) + elif 'cep' in queryd: + cepQueryReturn = self.filterData(cepQueryReturn) + if checkpoint: + data = cepQueryReturn + else: + dataf = os.path.join(self.dataDir, 'CEP.csv') + data.set_index('key', inplace=True) + data = self.dformat.toDF(dataf) + data = self.filterData(data) + if self.method in self.allowefMethodsClassification: + print "Detecting with selected method %s of type %s" % (self.method, self.type) + if os.path.isfile(os.path.join(self.modelsDir, self.modelName(self.method, self.load))): + print "Model found at %s" % str( + os.path.join(self.modelsDir, self.modelName(self.method, self.load))) + cmodel = cdmon.SciClassification(self.modelsDir, self.dataDir, self.checkpoint, self.export, + training=self.trainingSet, validation=self.validationSet, + validratio=self.validratio, compare=self.compare) + anomalies = cmodel.detect(self.method, self.load, data) + if not anomalies['anomalies']: + logger.info('[%s] : [INFO] No anomalies detected with %s', + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), str(self.method)) + print "No anomalies detected with %s" % str(self.method) + sleep(parseDelay(self.delay)) + else: + anomalies['method'] = self.method + anomalies['qinterval'] = self.qinterval + self.reportAnomaly(anomalies) + sleep(parseDelay(self.delay)) + else: + logger.error('[%s] : [ERROR] Model %s not found at %s ', + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.load, + str(os.path.join(self.modelsDir, self.modelName(self.method, self.load)))) + print "Model not found %s" % self.modelName(self.method, self.load) + sys.exit(1) + + else: + print "Unknown method %s of type %s" % (self.method, self.type) + logger.error('[%s] : [ERROR] Unknown method %s of type %s ', + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.method, + self.type) + sys.exit(1) + + # sys.exit(0) else: logger.error('[%s] : [ERROR] Unknown type %s ', datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.type) diff --git a/dmonadp.ini b/dmonadp_test.ini similarity index 83% rename from dmonadp.ini rename to dmonadp_test.ini index 741843e..1d7fb4a 100644 --- a/dmonadp.ini +++ b/dmonadp_test.ini @@ -6,19 +6,21 @@ DMonPort:5001 ;DMonPort:5002 ;From:1490358433793 ;To:1490359333793 -From:1490785940000 -To:1490790447003 +From:1498057785356 +To:1498144185356 ;Query:yarn:cluster, nn, nm, dfs, dn, mr;system -Query:storm -Query:userquery +Query:cep +;Query:storm Nodes: -QSize:0 +QSize:500 QInterval:30s +Categorical:0 + [Mode] -Training:false +Training:true Validate:False -Detect:true +Detect:false [Filter] @@ -30,8 +32,8 @@ Detect:true [Detect] Method:isoforest Type:clustering -Export:test1 -Load:test1 +Export:test3 +Load:test2 ;IsolationForest [MethodSettings] @@ -68,7 +70,7 @@ Network: tx:gd:34344;rx:ld:323434 [Misc] heap:512m -checkpoint:true -delay:30s -interval:15m +checkpoint:false +delay:15s +interval:30m resetindex:false \ No newline at end of file diff --git a/dmonscikit/dmonscilearnclassification.py b/dmonscikit/dmonscilearnclassification.py index 194ecb7..48b596b 100644 --- a/dmonscikit/dmonscilearnclassification.py +++ b/dmonscikit/dmonscilearnclassification.py @@ -31,7 +31,7 @@ def __init__(self, modelDir, dataDir, checkpoint, export, training, validation, self.compare = compare def detect(self, method, model, data): - smodel = self.__loadClusterModel(method, model) + smodel = self.__loadClassificationModel(method, model) anomalieslist = [] if not smodel: dpredict = 0 @@ -70,13 +70,16 @@ def detect(self, method, model, data): str(data.shape[1])) print "dpredict type is %s" % (type(dpredict)) if type(dpredict) is not int: - # TODO change so all normal events have tag 0 - anomalyarray = np.argwhere(dpredict != 0) - for an in anomalyarray: + data['AType'] = dpredict + for index, row in data.iterrows(): anomalies = {} - anomalies['utc'] = int(data.iloc[an[0]]['key']) - anomalies['hutc'] = ut2hum(int(data.iloc[an[0]]['key'])) - anomalieslist.append(anomalies) + if row['AType'] != 0: + print index + print data.get_value(index, 'AType') + anomalies['utc'] = int(index) + anomalies['hutc'] = ut2hum(int(index)) + anomalies['anomaly_type'] = data.get_value(index, 'AType') + anomalieslist.append(anomalies) anomaliesDict = {} anomaliesDict['anomalies'] = anomalieslist logger.info('[%s] : [INFO] Detected anomalies with model %s using method %s are -> %s', @@ -592,7 +595,7 @@ def __loadData(self, data=None, dropna=True): df = df.dropna() return df - def __loadClusterModel(self, method, model): + def __loadClassificationModel(self, method, model): ''' :param method: -> method name :param model: -> model name