Skip to content

Commit

Permalink
Remove partition mapping and custom io manager
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Apr 29, 2024
1 parent b5daee1 commit 306439c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
27 changes: 9 additions & 18 deletions with_openai/assets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import pickle
from typing import Dict, List, Any

from dagster import (
Expand All @@ -11,9 +10,7 @@
AllPartitionMapping,
AssetIn
)
from dagster_aws.s3 import S3Resource
from dagster_openai import OpenAIResource
from filelock import FileLock
from langchain.chains.qa_with_sources import stuff_prompt
#from langchain_openai import ChatOpenAI
from langchain.chat_models.openai import ChatOpenAI
Expand All @@ -25,7 +22,7 @@
#from langchain_community.vectorstores import FAISS
from langchain.vectorstores.faiss import FAISS

from .constants import SEARCH_INDEX_FILE, SUMMARY_TEMPLATE
from .constants import SUMMARY_TEMPLATE
from .utils import get_github_docs

docs_partitions_def = StaticPartitionsDefinition(
Expand All @@ -43,23 +40,18 @@
)

if bool(os.getenv("DAGSTER_IS_DEV_CLI")):
io_manager_key = "fs_io_manager"
else:
io_manager_key = "io_manager"
else:
io_manager_key = "s3_io_manager"


@asset
def s3_asset(context: AssetExecutionContext, s3: S3Resource):
context.log.info(s3.get_client().list_buckets())

# io_manager_key="fs_io_manager"
@asset(compute_kind="GitHub", partitions_def=docs_partitions_def, io_manager_key=io_manager_key)
@asset(compute_kind="GitHub", partitions_def=docs_partitions_def)
def source_docs(context: AssetExecutionContext):
return list(get_github_docs("dagster-io", "dagster", context.partition_key))


# io_manager_key="search_index_io_manager"
@asset(compute_kind="OpenAI", partitions_def=docs_partitions_def, io_manager_key=io_manager_key)
@asset(compute_kind="OpenAI", partitions_def=docs_partitions_def)
def search_index(context: AssetExecutionContext, openai: OpenAIResource, source_docs: List[Any]):
source_chunks = []
splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=0)
Expand All @@ -75,18 +67,17 @@ def search_index(context: AssetExecutionContext, openai: OpenAIResource, source_

return search_index.serialize_to_bytes()


class OpenAIConfig(Config):
model: str
question: str


# io_manager_key="fs_io_manager"
@asset(
compute_kind="OpenAI",
ins={
"search_index": AssetIn(partition_mapping=AllPartitionMapping()),
},
io_manager_key=io_manager_key
#ins={
# "search_index": AssetIn(partition_mapping=AllPartitionMapping()),
#},
)
def completion(
context: AssetExecutionContext,
Expand Down
8 changes: 5 additions & 3 deletions with_openai/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
sensor,
FilesystemIOManager
)
from dagster_aws.s3 import S3Resource
from dagster_aws.s3 import S3PickleIOManager, S3Resource
from dagster_openai import OpenAIResource

from . import assets
Expand Down Expand Up @@ -56,8 +56,10 @@ def question_sensor(context):
jobs=all_jobs,
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
"fs_io_manager": FilesystemIOManager(),
"s3": S3Resource(),
#"s3_io_manager": S3PickleIOManager(
# s3_resource=S3Resource(),
# s3_bucket="with_openai",
#)
},
sensors=all_sensors,
)

0 comments on commit 306439c

Please sign in to comment.