Skip to content

Commit

Permalink
added support for CEP usecase
Browse files Browse the repository at this point in the history
  • Loading branch information
igabriel85 committed Jun 23, 2017
1 parent f042b64 commit d2b4eac
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 9 deletions.
87 changes: 81 additions & 6 deletions adpengine/dmonadpengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, settingsDict, dataDir, modelsDir, queryDir):
self.reducemetrics = 0
self.mrapp = 0
self.userQueryReturn = 0
self.cepQueryReturn = 0
self.dataNodeTraining = 0
self.dataNodeDetecting = 0

Expand Down Expand Up @@ -483,7 +484,9 @@ def getData(self, detect=False):
self.mongoReturn = self.getMongodb(desNodes, detect=detect)
elif 'userquery' in queryd:
self.userQueryReturn = self.getQuery(detect=detect)
return self.systemReturn, self.yarnReturn, self.reducemetrics, self.mapmetrics, self.mrapp, self.sparkReturn, self.stormReturn, self.cassandraReturn, self.mongoReturn, self.userQueryReturn
elif 'cep' in queryd:
self.cepQueryReturn = self.getCEP(detect=detect)
return self.systemReturn, self.yarnReturn, self.reducemetrics, self.mapmetrics, self.mrapp, self.sparkReturn, self.stormReturn, self.cassandraReturn, self.mongoReturn, self.userQueryReturn, self.cepQueryReturn

def filterData(self, df, m=False):
'''
Expand Down Expand Up @@ -551,7 +554,7 @@ def trainMethod(self):
print "Getting data ..."
checkpoint = str2Bool(self.checkpoint)
queryd = queryParser(self.query)
systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userqueryReturn = self.getData()
systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userqueryReturn, cepQueryReturn = self.getData()
if not checkpoint:
if 'yarn' in queryd:
udata = self.dformat.toDF(os.path.join(self.dataDir, 'Final_Merge.csv'))
Expand All @@ -565,6 +568,8 @@ def trainMethod(self):
return "not yet implemented" # todo
elif 'userquery' in queryd:
udata = self.dformat.toDF(os.path.join(self.dataDir, 'query_response.csv'))
elif 'cep' in queryd:
udata = self.dformat.toDF(os.path.join(self.dataDir, 'CEP.csv'))
else:
if 'yarn' in queryd:
udata = yarnReturn
Expand All @@ -578,6 +583,8 @@ def trainMethod(self):
return "not yet implemented" # todo
elif 'userquery' in queryd:
udata = userqueryReturn
elif 'cep' in queryd:
udata = cepQueryReturn
udata = self.filterData(udata) #todo check
if self.type == 'clustering':
if self.method in self.allowedMethodsClustering:
Expand Down Expand Up @@ -605,6 +612,8 @@ def trainMethod(self):
dataf = os.path.join(self.dataDir, 'Merged_Mongo.csv')
elif 'userquery' in queryd:
dataf = os.path.join(self.dataDir, 'query_response.csv')
elif 'cep' in queryd:
dataf = os.path.join(self.dataDir, 'cep.csv')
data = dataf
if self.method == 'skm':
print "Method %s settings detected -> %s" % (self.method, str(self.methodSettings))
Expand Down Expand Up @@ -814,7 +823,7 @@ def detectAnomalies(self):
if self.type == 'clustering':
while True:
print "Collect data ..."
systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userQueryReturn = self.getData(detect=True)
systemReturn, yarnReturn, reducemetrics, mapmetrics, mrapp, sparkReturn, stormReturn, cassandraReturn, mongoReturn, userQueryReturn, cepQueryReturn = self.getData(detect=True)
# if list(set(self.dformat.fmHead) - set(list(yarnReturn.columns.values))):
# print "Mismatch between desired and loaded data"
# sys.exit()
Expand All @@ -834,19 +843,27 @@ def detectAnomalies(self):
dataf = os.path.join(self.dataDir, 'Final_Merge.csv')
data = dataf
elif 'storm' in queryd:
stormReturn = self.filterData(stormReturn)
if checkpoint:
data = stormReturn
else:
dataf = os.path.join(self.dataDir, 'Storm.csv')
data = dataf
data = self.filterData(data)
elif 'userquery' in queryd:
userQueryReturn = self.filterData(userQueryReturn)
if checkpoint:
data = userQueryReturn
else:
dataf = os.path.join(self.dataDir, 'query_response.csv')
data = dataf
data = self.dformat.toDF(dataf)
data = self.filterData(data)
elif 'cep' in queryd:
cepQueryReturn = self.filterData(cepQueryReturn)
if checkpoint:
data = cepQueryReturn
else:
dataf = os.path.join(self.dataDir, 'CEP.csv')
data = self.dformat.toDF(dataf)
data = self.filterData(data)
if self.method in self.allowedMethodsClustering:
print "Detecting with selected method %s of type %s" % (self.method, self.type)
if os.path.isfile(os.path.join(self.modelsDir, self.modelName(self.method, self.load))):
Expand Down Expand Up @@ -1487,6 +1504,64 @@ def getQuery(self, detect=False):
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
return returnUQ

def getCEP(self, detect=False):
if detect:
tfrom = "now-%s" % self.interval
to = "now"
else:
tfrom = self.tfrom
to = self.to
print "Querying CEP metrics ..."
logger.info('[%s] : [INFO] Querying CEP metrics ...',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
checkpoint = str2Bool(self.checkpoint)
cep, cep_file = self.qConstructor.cepQueryString()

# Queries
qcep = self.qConstructor.cepQuery(cep, tfrom, to, self.qsize, self.interval, qmin_doc_count=0)


# Execute query and convert response to csv
respMetrics, gcep = self.dmonConnector.query(queryBody=qcep)
print gcep
dCepArray = []
try:
for el in gcep['hits']['hits']:
try:
dCep = {}
dCep['ms'] = el['_source']['ms']
dCep['key'] = el['_source']['@timestamp']
dCep['component'] = el['_source']['Component']
dCep['host'] = el['_source']['host']
dCep['ship'] = el['_source']['ship']
dCep['method'] = el['_source']['method']
dCepArray.append(dCep)
except Exception as inst:
print 'Failed to parse CEP response!'
logger.warning('[%s] : [WARN] Failed to parse CEP response with %s and %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), type(inst), inst.args)
except Exception as inst:
print 'Malformed CEP response detected. Exiting!'
logger.error('[%s] : [ERROR] Malformed CEP response detected with %s and %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), type(inst), inst.args)
sys.exit(1)
if not dCepArray:
print "empty"
print gcep
sys.exit(1)
df = self.dformat.dtoDF(dCepArray)
if not checkpoint:
self.dformat.df2csv(df, os.path.join(self.dataDir, cep_file))
returnCEP = 0
else:
df.set_index('key', inplace=True)
returnCEP = df
print "Querying CEP metrics complete"
logger.info('[%s] : [INFO] Querying CEP metrics complete',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))

return returnCEP

def printTest(self):
print "Endpoint -> %s" %self.esendpoint
print "Method settings -> %s" %self.methodSettings
Expand Down
37 changes: 37 additions & 0 deletions pyQueryConstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ def sideQueryString(self):
file = "query_response.csv"
return file

def cepQueryString(self):
qstring = "type:cep-posidonia AND DComp:DMON"
file = "CEP.csv"
return qstring, file

def loadAverage(self): # TODO
return "Average load across all nodes!"

Expand Down Expand Up @@ -881,6 +886,38 @@ def mongoDBGaugeQuery(self, qstring, qgte, qlte, qsize, qinterval, wildCard=True
cqueryd = cquery.to_dict()
return cqueryd

def cepQuery(self, qstring, qgte, qlte, qsize, qinterval, wildCard=True, qtformat="epoch_millis",
qmin_doc_count=1):
cquery = Dict()
cquery.highlight.pre_tags = ["@kibana-highlighted-field@"]
cquery.highlight.post_tags = ["@/kibana-highlighted-field@"]
cquery.highlight.fields = {"*":{}}
cquery.highlight.require_field_match = False
cquery.highlight.fragment_size = 2147483647

cquery.query.filtered.query.query_string.query = qstring
cquery.query.filtered.query.query_string.analyze_wildcard = wildCard
cquery.query.filtered.filter.bool.must = [
{"range": {"@timestamp": {"gte": qgte, "lte": qlte, "format": qtformat}}}]
cquery.query.filtered.filter.bool.must_not = []

cquery.sort = [{"@timestamp": {"order": "desc", "unmapped_type": "boolean"}}]
cquery.size = qsize

cquery.aggs["2"].date_histogram.field = "@timestamp"
cquery.aggs["2"].date_histogram.interval = qinterval
cquery.aggs["2"].date_histogram.time_zone = "Europe/Helsinki"
cquery.aggs["2"].date_histogram.min_doc_count = qmin_doc_count
cquery.aggs["2"].date_histogram.extended_bounds.min = qgte
cquery.aggs["2"].date_histogram.extended_bounds.max = qlte

cquery.fields = ["*", "_source"]
cquery.script_fields = {}
cquery.fielddata_fields = ["@timestamp"]
cqueryd = cquery.to_dict()
return cqueryd


def sparkQuery(self):
return "Spark metrics query"

Expand Down
Loading

0 comments on commit d2b4eac

Please sign in to comment.