Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature:add Jaeger to provide distributed tracing #70

Merged
merged 7 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/search_vector/consts/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@
KAFKA_CONSUMER_VECTOR_INDEX_TOPIC = "search-engine-csv-loader-topic"

VECTOR_RECALL_TOPK = 20

OTEL_ENDPOINT = "127.0.0.1:4317"

SERVICE_NAME = "tangseng-python"
8 changes: 6 additions & 2 deletions app/search_vector/kafka_operate/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
# under the License.

"""store vector index from kafka"""
import inspect
from opentelemetry import trace
from ..kafka_operate.kafka_operate import kafka_helper


def store_data_from_kafka(kafka_topic, milvus_table_name):
"""
store data to mivlus from kakfa for building inverted index
"""
kafka_helper.connect_consumer(kafka_topic)
kafka_helper.consume_messages_store_milvus(milvus_table_name)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
kafka_helper.connect_consumer(kafka_topic)
kafka_helper.consume_messages_store_milvus(milvus_table_name)
51 changes: 29 additions & 22 deletions app/search_vector/kafka_operate/kafka_operate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
# under the License.

"""kafka operate"""
import inspect
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from opentelemetry import trace
from ..config.config import KAFKA_CLUSTER
from ..milvus import milvus
from ..milvus.operators import do_upload

tracer = trace.get_tracer(__name__)


class KafkaHelper:
"""
Expand All @@ -39,25 +43,27 @@ def connect_producer(self):
"""
connect kafka producer
"""
try:
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers)
print("Connected to Kafka producer successfully.")
except KafkaError as e:
print(f"Failed to connect to Kafka producer: {e}")
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
try:
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers)
print("Connected to Kafka producer successfully.")
except KafkaError as e:
print(f"Failed to connect to Kafka producer: {e}")

def connect_consumer(self, topic):
"""
connect kafka consumer
"""
try:
self.consumer = KafkaConsumer(
topic, bootstrap_servers=self.bootstrap_servers)
print(
f"Connected to Kafka consumer successfully. Listening to topic: {topic}"
)
except KafkaError as e:
print(f"Failed to connect to Kafka consumer: {e}")
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
try:
self.consumer = KafkaConsumer(
topic, bootstrap_servers=self.bootstrap_servers)
print(
f"Connected to Kafka consumer successfully. Listening to topic: {topic}"
)
except KafkaError as e:
print(f"Failed to connect to Kafka consumer: {e}")

def send_message(self, topic, msg):
"""
Expand Down Expand Up @@ -88,14 +94,15 @@ def consume_messages_store_milvus(self, milvus_table):
"""
consume messages from kafka and store in milvus
"""
if not self.consumer:
print("No Kafka consumer connected.")
return
print("Consuming messages...")
for msg in self.consumer:
data = json.loads(msg.value.decode('utf-8'))
do_upload(milvus_table, int(data["doc_id"]), data["title"],
data["body"], self.milvus_client)
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
if not self.consumer:
print("No Kafka consumer connected.")
return
print("Consuming messages...")
for msg in self.consumer:
data = json.loads(msg.value.decode('utf-8'))
do_upload(milvus_table, int(data["doc_id"]), data["title"],
data["body"], self.milvus_client)

def on_send_success(self, record_metadata):
"""
Expand Down
39 changes: 22 additions & 17 deletions app/search_vector/service/search_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
# under the License.

"""search vector grpc service"""
import inspect
import json
import grpc
import logging
import asyncio

from opentelemetry import trace
from ..consts.consts import VECTOR_RECALL_TOPK
from idl.pb.search_vector import search_vector_pb2
from ..config.config import DEFAULT_MILVUS_TABLE_NAME, VECTOR_ADDR
Expand All @@ -29,6 +31,8 @@
from ..milvus.milvus import milvus_client
from idl.pb.search_vector import search_vector_pb2_grpc

tracer = trace.get_tracer(__name__)


class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer):
"""
Expand All @@ -37,23 +41,24 @@ class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer):

def SearchVector(self, request,
context) -> search_vector_pb2.SearchVectorResponse:
try:
queryies = request.query
doc_ids = []
for query in queryies:
ids, distants = do_search(DEFAULT_MILVUS_TABLE_NAME, query,
VECTOR_RECALL_TOPK, milvus_client)
print("search vector ids", ids)
doc_ids += ids
print("search vector data", doc_ids)
return search_vector_pb2.SearchVectorResponse(code=200,
doc_ids=doc_ids,
msg='ok',
error='')
except Exception as e:
print("search vector error", e)
return search_vector_pb2.SearchVectorResponse(code=500,
error=str(e))
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
try:
queryies = request.query
doc_ids = []
for query in queryies:
ids, distants = do_search(DEFAULT_MILVUS_TABLE_NAME, query,
VECTOR_RECALL_TOPK, milvus_client)
print("search vector ids", ids)
doc_ids += ids
print("search vector data", doc_ids)
return search_vector_pb2.SearchVectorResponse(code=200,
doc_ids=doc_ids,
msg='ok',
error='')
except Exception as e:
print("search vector error", e)
return search_vector_pb2.SearchVectorResponse(code=500,
error=str(e))


async def serve() -> None:
Expand Down
17 changes: 17 additions & 0 deletions app/search_vector/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

48 changes: 48 additions & 0 deletions app/search_vector/tracing/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import SERVICE_NAME
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat


def init_tracer_provider(url, service_name):
# 创建一个新的 OTLP 导出器
exporter = OTLPSpanExporter(
insecure=True,
endpoint=url,
)

resource = Resource(attributes={
SERVICE_NAME: service_name,
})

# 设置全局tracer
provider = TracerProvider(
resource=resource,
)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

# 设置全局Propagator
b3_propagator = B3MultiFormat()
set_global_textmap(b3_propagator)
8 changes: 7 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
from PIL import Image
from flask import Flask, request
from torchvision import transforms
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from app.search_vector.service.search_vector import serve

from app.search_vector.consts.consts import OTEL_ENDPOINT
from app.search_vector.consts.consts import SERVICE_NAME
from app.search_vector.config.config import DEFAULT_MILVUS_TABLE_NAME, NETWORK_MODEL_NAME
from app.search_vector.cirtorch.datasets.datahelpers import imresize
from app.search_vector.cirtorch.networks.imageretrievalnet import init_network
from app.search_vector.milvus.milvus import milvus_client
from app.search_vector.milvus.operators import do_upload, do_search
from app.search_vector.utils.logs import LOGGER
from app.search_vector.tracing.tracing import init_tracer_provider

app = Flask(__name__)

Expand Down Expand Up @@ -199,6 +202,9 @@ def init_model():
net, lsh, transform = init_model()

if __name__ == "__main__":
init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME)
# FlaskInstrumentor is to trace http
# FlaskInstrumentor().instrument_app(app)
# app.run(host=WEBSITE_HOST, port=WEBSITE_PORT, debug=True)
# print("start server {}:{}".format(WEBSITE_HOST, WEBSITE_PORT))
asyncio.run(serve())
9 changes: 9 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,12 @@ ujson==5.8.0
urllib3==2.0.4
Werkzeug==2.3.7
zipp==3.17.0
opentelemetry-api==1.23.0
opentelemetry-sdk==1.23.0
opentelemetry-exporter-otlp-proto-grpc==1.23.0
opentelemetry-propagator-b3==1.23.0
opentelemetry-instrumentation-flask==0.44b0




14 changes: 9 additions & 5 deletions vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# under the License.

"""the script file is to handle vector index from kafka"""
import inspect
import threading
from opentelemetry import trace
from app.search_vector.consts.consts import KAFKA_CONSUMER_VECTOR_INDEX_TOPIC
from app.search_vector.config.config import DEFAULT_MILVUS_TABLE_NAME
from app.search_vector.kafka_operate.consumer import store_data_from_kafka
Expand All @@ -28,11 +30,13 @@ def consume_inverted_index():
"""
topic = KAFKA_CONSUMER_VECTOR_INDEX_TOPIC
table_name = DEFAULT_MILVUS_TABLE_NAME
thread = threading.Thread(target=store_data_from_kafka(
topic, table_name)) # 创建线程对象
thread.start() # 启动线程
print("start consume inverted index")
thread.join() # 等待线程结束
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function):
thread = threading.Thread(target=store_data_from_kafka(
topic, table_name)) # 创建线程对象
thread.start() # 启动线程
print("start consume inverted index")
thread.join() # 等待线程结束


if __name__ == "__main__":
Expand Down
Loading