Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

malloy-py throwing error every time #144

Open
bernardro opened this issue Nov 12, 2024 · 1 comment
Open

malloy-py throwing error every time #144

bernardro opened this issue Nov 12, 2024 · 1 comment

Comments

@bernardro
Copy link

bernardro commented Nov 12, 2024

What happens?

In dagster 1.9.1 (see how to reproduce below)

The following error is thrown every time

Channel not in ready state

despite trying the hacky

import nest_asyncio
nest_asyncio.apply()

error details

malloy.runtime.MalloyRuntimeError: ('Channel not in ready state', <ChannelConnectivity.TRANSIENT_FAILURE: (3, 'transient failure')>)
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 482, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/dagster/_core/execution/plan/compute.py", line 133, in gen_from_async_gen
    yield loop.run_until_complete(async_gen.__anext__())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "/home/rau/.pyenv/versions/3.12.7/lib/python3.12/asyncio/futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/home/rau/.pyenv/versions/3.12.7/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/home/rau/KVMSHARE/WS/SP/DEV/dgstr/dgstr/assets/ovtr/async_test1.py", line 77, in asset2
    data = await run_mloy(context)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rau/KVMSHARE/WS/SP/DEV/dgstr/dgstr/assets/ovtr/async_test1.py", line 40, in run_mloy
    return await runtime.load_file(mloy_file_path).run(named_query="people_name_yr")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/malloy/runtime.py", line 150, in run
    [sql, connection_name] = await self.get_sql(query=query,
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/malloy/runtime.py", line 111, in get_sql
    return await self.compile_malloy(named_query=named_query, query=query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rau/.pyenv/versions/vnv/lib/python3.12/site-packages/malloy/runtime.py", line 142, in compile_malloy
    raise MalloyRuntimeError("Channel not in ready state", state)

To Reproduce

Attempt the simplest example from malloy python docs

mkdir proj && cd proj
pyenv local 3.12.7

pip install dagster
dagster project scaffold --name testmalloyp
cd testmalloyp

pip install -e ".[dev]"
pip install malloy

off the bat there is an error regarding conflicting versions of protobuf

dagster 1.9.1 requires protobuf<5,>=4; python_version >= "3.11", but you have protobuf 5.28.3 which is incompatible

moving on

# assets.py (generated for you) - define an asset (think table of processed data)
import malloy
import dagster

...

async def run_mloy():
    with malloy.Runtime() as runtime:
        runtime.add_connection(DuckDbConnection())
        return await runtime.load_file('./imdb_data/names.parquet').run(named_query='query: people -> { select: primaryName, birthYear }')

asst_spec = AssetCheckSpec(name="my_check", asset="asset1")
@asset(check_specs=[asst_spec])
async def asset1(context):
    context.log.info(f"starting generation of asset 1")
    data = await run_mloy()
    dataframe = data.to_dataframe()
    print(dataframe)

    yield Output("asset1 output", output_name="result")
    yield AssetCheckResult(passed=True, description="Asset check passed...")
# start the dagster web console to start managing assets, usually at http://127.0.0.1:3000
dagster dev

click on the 'assets' link and click 'materialize' to trigger the malloy we just wrote then check for errors

OS:

Fedora 40

Malloy Client:

Python package @ 639dc84

Malloy Client Version:

1.9.1

Database Connection:

DuckDB

@gmaden
Copy link
Collaborator

gmaden commented Nov 18, 2024

I was able to run this successfully (with some fixes/additions to your steps that were omitted).

From the error message you're reporting it looks like the malloy-service that does compilation is not being properly spun up and reporting reachable. I would verify that you can just run malloy directly and that the malloy-service binary is not being blocked from executing.

My repro steps:

  • Added pip install for numpy and pandas since those are required when using dataframes.
  • Cloned malloy-samples into a sibling directory of testmalloyp project
  • I'm using python 3.11.9 on the machine I used for testing
mkdir proj && cd proj
python3 -m venv .venv
source .venv/bin/activate

pip install dagster
dagster project scaffold --name testmalloyp
git clone https://github.com/malloydata/malloy-samples.git
cd testmalloyp

pip install -e ".[dev]"
pip install malloy
pip install numpy
pip install pandas

Updated assets.py

  • Set home directory for duckdbconnection
  • runtime.load_file(...) expects to load a malloy source file, not a parquet directly
  • looks like you're trying to run a custom query, not a named_query, so switched up the syntax of that call a bit.
import malloy
import os
from pathlib import Path
from malloy.data.duckdb.duckdb_connection import DuckDbConnection
from dagster import asset, asset_check, AssetCheckResult, AssetCheckSpec, Output


async def run_mloy():
    with malloy.Runtime() as runtime:
        current_directory = os.path.dirname(os.path.abspath(__file__))
        home_dir = Path(current_directory, '../../malloy-samples/imdb')
        runtime.add_connection(DuckDbConnection(home_dir=home_dir))
        return await runtime.load_file(f'{home_dir}/imdb.malloy').run(query='run: people -> { select: primaryName, birthYear }')

asst_spec = AssetCheckSpec(name="my_check", asset="asset1")
@asset(check_specs=[asst_spec])
async def asset1(context):
    context.log.info(f"starting generation of asset 1")
    data = await run_mloy()
    dataframe = data.to_dataframe()
    print(dataframe)

    yield Output("asset1 output", output_name="result")
    yield AssetCheckResult(passed=True, description="Asset check passed...")

Run dagster

dagster dev

clicked on assets and then dropdown in the asset1 row and selected materialize

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants