Skip to content

Commit

Permalink
added detect method to sciclassification
Browse files Browse the repository at this point in the history
  • Loading branch information
igabriel85 committed Jun 25, 2017
1 parent d2b4eac commit 5886717
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
74 changes: 72 additions & 2 deletions adpengine/dmonadpengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,78 @@ def detectAnomalies(self):
self.type)
sys.exit(1)
elif self.type == 'classification':
print "Not yet supported!" # TODO
sys.exit(0)
while True:
print "Collect data ..."
systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userQueryReturn, cepQueryReturn = self.getData(
detect=True)
if 'yarn' in queryd:
# yarnReturn = self.filterData(yarnReturn) # todo
if checkpoint:
data = yarnReturn
else:
dataf = os.path.join(self.dataDir, 'Final_Merge.csv')
data = self.dformat.toDF(dataf)
data.set_index('key', inplace=True)
data = self.filterData(data)
elif 'storm' in queryd:
if checkpoint:
data = stormReturn
else:
dataf = os.path.join(self.dataDir, 'Storm.csv')
data = self.dformat.toDF(dataf)
data.set_index('key', inplace=True)
data = self.filterData(data)
elif 'userquery' in queryd:
if checkpoint:
data = userQueryReturn
else:
dataf = os.path.join(self.dataDir, 'query_response.csv')
data = self.dformat.toDF(dataf)
data.set_index('key', inplace=True)
data = self.filterData(data)
elif 'cep' in queryd:
cepQueryReturn = self.filterData(cepQueryReturn)
if checkpoint:
data = cepQueryReturn
else:
dataf = os.path.join(self.dataDir, 'CEP.csv')
data.set_index('key', inplace=True)
data = self.dformat.toDF(dataf)
data = self.filterData(data)
if self.method in self.allowefMethodsClassification:
print "Detecting with selected method %s of type %s" % (self.method, self.type)
if os.path.isfile(os.path.join(self.modelsDir, self.modelName(self.method, self.load))):
print "Model found at %s" % str(
os.path.join(self.modelsDir, self.modelName(self.method, self.load)))
cmodel = cdmon.SciClassification(self.modelsDir, self.dataDir, self.checkpoint, self.export,
training=self.trainingSet, validation=self.validationSet,
validratio=self.validratio, compare=self.compare)
anomalies = cmodel.detect(self.method, self.load, data)
if not anomalies['anomalies']:
logger.info('[%s] : [INFO] No anomalies detected with %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), str(self.method))
print "No anomalies detected with %s" % str(self.method)
sleep(parseDelay(self.delay))
else:
anomalies['method'] = self.method
anomalies['qinterval'] = self.qinterval
self.reportAnomaly(anomalies)
sleep(parseDelay(self.delay))
else:
logger.error('[%s] : [ERROR] Model %s not found at %s ',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.load,
str(os.path.join(self.modelsDir, self.modelName(self.method, self.load))))
print "Model not found %s" % self.modelName(self.method, self.load)
sys.exit(1)

else:
print "Unknown method %s of type %s" % (self.method, self.type)
logger.error('[%s] : [ERROR] Unknown method %s of type %s ',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.method,
self.type)
sys.exit(1)

# sys.exit(0)
else:
logger.error('[%s] : [ERROR] Unknown type %s ',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), self.type)
Expand Down
26 changes: 14 additions & 12 deletions dmonadp.ini → dmonadp_test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ DMonPort:5001
;DMonPort:5002
;From:1490358433793
;To:1490359333793
From:1490785940000
To:1490790447003
From:1498057785356
To:1498144185356
;Query:yarn:cluster, nn, nm, dfs, dn, mr;system
Query:storm
Query:userquery
Query:cep
;Query:storm
Nodes:
QSize:0
QSize:500
QInterval:30s
Categorical:0


[Mode]
Training:false
Training:true
Validate:False
Detect:true
Detect:false


[Filter]
Expand All @@ -30,8 +32,8 @@ Detect:true
[Detect]
Method:isoforest
Type:clustering
Export:test1
Load:test1
Export:test3
Load:test2

;IsolationForest
[MethodSettings]
Expand Down Expand Up @@ -68,7 +70,7 @@ Network: tx:gd:34344;rx:ld:323434

[Misc]
heap:512m
checkpoint:true
delay:30s
interval:15m
checkpoint:false
delay:15s
interval:30m
resetindex:false
19 changes: 11 additions & 8 deletions dmonscikit/dmonscilearnclassification.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, modelDir, dataDir, checkpoint, export, training, validation,
self.compare = compare

def detect(self, method, model, data):
smodel = self.__loadClusterModel(method, model)
smodel = self.__loadClassificationModel(method, model)
anomalieslist = []
if not smodel:
dpredict = 0
Expand Down Expand Up @@ -70,13 +70,16 @@ def detect(self, method, model, data):
str(data.shape[1]))
print "dpredict type is %s" % (type(dpredict))
if type(dpredict) is not int:
# TODO change so all normal events have tag 0
anomalyarray = np.argwhere(dpredict != 0)
for an in anomalyarray:
data['AType'] = dpredict
for index, row in data.iterrows():
anomalies = {}
anomalies['utc'] = int(data.iloc[an[0]]['key'])
anomalies['hutc'] = ut2hum(int(data.iloc[an[0]]['key']))
anomalieslist.append(anomalies)
if row['AType'] != 0:
print index
print data.get_value(index, 'AType')
anomalies['utc'] = int(index)
anomalies['hutc'] = ut2hum(int(index))
anomalies['anomaly_type'] = data.get_value(index, 'AType')
anomalieslist.append(anomalies)
anomaliesDict = {}
anomaliesDict['anomalies'] = anomalieslist
logger.info('[%s] : [INFO] Detected anomalies with model %s using method %s are -> %s',
Expand Down Expand Up @@ -592,7 +595,7 @@ def __loadData(self, data=None, dropna=True):
df = df.dropna()
return df

def __loadClusterModel(self, method, model):
def __loadClassificationModel(self, method, model):
'''
:param method: -> method name
:param model: -> model name
Expand Down

0 comments on commit 5886717

Please sign in to comment.