Skip to content

Commit

Permalink
Merge pull request #327 from elastic/joshdevins/ml-search-metrics
Browse files Browse the repository at this point in the history
Add simulation for online search relevance metrics
  • Loading branch information
joshdevins authored Jun 25, 2020
2 parents 1b0e1cc + d3699c2 commit 2b6a670
Show file tree
Hide file tree
Showing 33 changed files with 4,680 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.pyc
/notebooks/.ipynb_checkpoints
/venv/
33 changes: 33 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
default: test

all: clean init test

venv/bin/activate:
@rm -rf venv/
@python3 -m venv venv

.PHONY: clean
clean:
@rm -rf venv/

.PHONY: init
init: venv/bin/activate
. venv/bin/activate ; \
pip install -r requirements.txt

.PHONY: unit-test
unit-test: venv/bin/activate
. venv/bin/activate ; \
python3 -m unittest discover -s tests/unit

.PHONY: integration-test
integration-test: venv/bin/activate
. venv/bin/activate ; \
python3 -m unittest discover -s tests/integration

test: unit-test integration-test

.PHONY: jupyter
jupyter: venv/bin/activate
. venv/bin/activate ; \
jupyter lab
464 changes: 464 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/README.md

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/bin/complete
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!venv/bin/python

"""
Generate auto-complete suggestions from a provided query prefix string.
"""

import argparse

from elasticsearch import Elasticsearch

DEFAULT_FUZZINESS = 0
DEFAULT_URL = 'http://localhost:9200'
INDEX = 'ecs-search-metrics_transform_completion'


def main():
parser = argparse.ArgumentParser(prog='complete')
parser.add_argument('--url', default=DEFAULT_URL, help="an Elasticsearch connection URL, e.g. http://user:secret@localhost:9200")
parser.add_argument('--fuzziness', default=DEFAULT_FUZZINESS, help="amount of fuzziness for the completion suggestor")
parser.add_argument('query')
args = parser.parse_args()

es = Elasticsearch(args.url)
results = es.search(index=INDEX, body={
'suggest': {
'suggest': {
'prefix': args.query,
'completion': {
'field': 'search.query.value.completion',
'fuzzy': {
'fuzziness': args.fuzziness,
},
},
},
},
})

for suggestion in results['suggest']['suggest'][0]['options']:
print(suggestion['text'])


if __name__ == "__main__":
main()
90 changes: 90 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/bin/index
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!venv/bin/python

"""
Indexes NDJSON events split into files in a directory. Optionally provide a pipeline to pre-process events
at ingest time.
"""

import argparse
import os
import sys

from elasticsearch import Elasticsearch, helpers

# project library
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from metrics.resources import INDEX, Timer, load_json

DEFAULT_CHUNK_SIZE = 10000
DEFAULT_THREAD_COUNT = 4
DEFAULT_URL = 'http://localhost:9200'

SC_PIPELINE_NAMES = ['sc-click-events', 'sc-query-events']


def create_pipeline_from_file(es, filename):
pipeline = load_json(filename)
pipeline_name = os.path.splitext(os.path.basename(filename))[0]

es.ingest.put_pipeline(pipeline_name, body=pipeline)

return pipeline_name


def index_events(es, filenames, pipeline, thread_count=DEFAULT_THREAD_COUNT, chunk_size=DEFAULT_CHUNK_SIZE):
"""
Indexes event docs from the given directory using the given pipeline at ingest.
We use a bulk index in parallel with large chunks, since the documents are
very small, and a big timeout to just get it done.
"""

def actions():
for filename in filenames:
with open(filename, 'r') as f:
for line in f:
yield {
'_index': INDEX,
'pipeline': pipeline,
'_source': line,
}

print(f"Indexing events into '{INDEX}' with pipeline '{pipeline}'")

with Timer() as t:
for success, info in helpers.parallel_bulk(es, actions(), thread_count=thread_count, chunk_size=chunk_size, request_timeout=600, refresh='wait_for'):
if not success:
print(" - failure: ", info)

print(f" - duration: {t.interval:.04f} sec")


def main():
parser = argparse.ArgumentParser(prog='index')
parser.add_argument('--url', default=DEFAULT_URL, help="An Elasticsearch connection URL, e.g. http://user:secret@localhost:9200")
parser.add_argument('--pipeline', required=False, help="Pipeline config file to use for indexing events")
parser.add_argument('--thread-count', required=False, default=DEFAULT_THREAD_COUNT, help="Number of threads for bulk indexing")
parser.add_argument('--chunk-size', required=False, default=DEFAULT_CHUNK_SIZE, help="Chunk size to for bulk indexing")
parser.add_argument('events', nargs='+', help="The NDJSON file(s) or pattern containing events, e.g. data/events-*.ndjson")
args = parser.parse_args()

es = Elasticsearch(args.url)

if args.pipeline:
# create and use the specified indexing pipeline
pipeline_name = create_pipeline_from_file(es, args.pipeline)
else:
# no pipeline specified, use default which is the same as the index name
pipeline_name = INDEX

index_events(es, args.events, pipeline_name, args.thread_count, args.chunk_size)

# make index searchable
es.indices.refresh(INDEX)

# show index size
index_size = es.count(index=INDEX)['count']
print(f"Index size: {index_size}")


if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/bin/kibana
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!venv/bin/python

"""
Sets up a Kibana index pattern for metrics and imports a pre-generated dashboard
and dependent visualizations.
"""

import argparse
import os
import requests
import sys

# project library
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from metrics.resources import load_config

DEFAULT_URL = 'http://localhost:5600'


def main():
parser = argparse.ArgumentParser(prog='kibana')
parser.add_argument('--url', default=DEFAULT_URL,
help="A Kibana connection URL, e.g. http://user:secret@localhost:5600")
args = parser.parse_args()

with requests.Session() as s:
payload = load_config('kibana', 'dashboard')
s.headers['kbn-xsrf'] = 'true'
r = s.post(f'{args.url}/api/kibana/dashboards/import', json=payload)
if r.status_code == 200:
print(f"Done. Go to Kibana and load the dashboard 'Search Metrics'.")
else:
print(f"Got {r.status_code} instead of a 200 response from Kibana. Here's the response:")
print(r.text)


if __name__ == "__main__":
main()
32 changes: 32 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/bin/prepare
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!venv/bin/python

"""
Prepares the basic resources required: indices, pipelines, transforms.
"""

import argparse
import os
import sys

from elasticsearch import Elasticsearch

# project library
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from metrics.resources import prepare

DEFAULT_URL = 'http://localhost:9200'


def main():
parser = argparse.ArgumentParser(prog='prepare')
parser.add_argument('--url', default=DEFAULT_URL, help="An Elasticsearch connection URL, e.g. http://user:secret@localhost:9200")
args = parser.parse_args()

es = Elasticsearch(args.url)

# create all resources
prepare(es)


if __name__ == "__main__":
main()
88 changes: 88 additions & 0 deletions Machine Learning/Online Search Relevance Metrics/bin/simulate
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!venv/bin/python

"""
A script to generate simulated query and click logs. Logs are printed interleaved to stdout on a per-user basis. Clicks
will be generated in proper time order after the query. All events are from a single 24-hour period.
TODO: Interleave "business goal" events with click events. They need to be interleaved as you wouldn't usually have a
business goal event after an unrelated click event.
"""

import argparse
import json
import os
import sys

from elasticsearch import Elasticsearch

# project library
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from metrics import simulate
from metrics.resources import INDEX, TRANSFORM_NAMES, prepare, start_transforms

DEFAULT_NUM_DOCS = 10000
DEFAULT_NUM_USERS = 100
DEFAULT_MAX_QUERIES = 10
DEFAULT_URL = 'http://localhost:9200'


def command_stdout(args):
simulate.generate_events(
args.num_documents,
args.num_users,
args.max_queries,
lambda x: print(json.dumps(x)),
)


def command_elasticsearch(args):
es = Elasticsearch(args.url)

# create all resources
prepare(es)

simulate.generate_events(
args.num_documents,
args.num_users,
args.max_queries,
lambda x: es.index(index=INDEX, pipeline=INDEX, body=x),
with_progress=True,
)

# make index searchable
es.indices.refresh(INDEX)

# run transforms
start_transforms(es, TRANSFORM_NAMES)

# show index size
index_size = es.count(index=INDEX)['count']
print(f"Index size: {index_size}")


def main():
parser = argparse.ArgumentParser(prog='simulate')

parser.add_argument('--num-documents', type=int, default=DEFAULT_NUM_DOCS,
help="the number of documents in the corpus")
parser.add_argument('--num-users', type=int, default=DEFAULT_NUM_USERS,
help="the number of users to generate queries for")
parser.add_argument('--max-queries', type=int, default=DEFAULT_MAX_QUERIES,
help="the maximum number of queries per user")

subparsers = parser.add_subparsers()

stdout_subparser = subparsers.add_parser('stdout', help="write events to stdout")
stdout_subparser.set_defaults(func=command_stdout)

es_subparser = subparsers.add_parser('elasticsearch', help="write events to an Elasticsearch instance")
es_subparser.add_argument('--url', default=DEFAULT_URL,
help="an Elasticsearch connection URL, e.g. http://user:secret@localhost:9200")
es_subparser.set_defaults(func=command_elasticsearch)

args = parser.parse_args()
args.func(args)


if __name__ == "__main__":
main()
Loading

0 comments on commit 2b6a670

Please sign in to comment.