Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'bool' object has no attribute 'lower' #386

Closed
ZhymabekRoman opened this issue Nov 29, 2024 · 4 comments · Fixed by #390
Closed

AttributeError: 'bool' object has no attribute 'lower' #386

ZhymabekRoman opened this issue Nov 29, 2024 · 4 comments · Fixed by #390

Comments

@ZhymabekRoman
Copy link
Contributor

Hello @s3rius! Thank you for creating taskiq.

I encountered an exception (something like AttributeError: 'bool' object has no attribute 'lower') with taskiq when attempting to use my custom SimpleRetryMiddleware. The problem occurred with the following code:

@taskiq_broker.task(retry_on_error=True)
async def _process_not_reached_deal(deal_details: BitrixDealResultData) -> Optional[bool]:

Unfortunately, I don't have the full traceback of the exception to provide for troubleshooting.

I've implemented a fix, and the code is now working correctly:

_LABEL_PARSERS: Dict[LabelType, Callable[[str], Any]] = {
    LabelType.INT: int,
    LabelType.STR: str,
    LabelType.FLOAT: float,
    LabelType.BOOL: lambda x: str(x).lower() == "true",
    LabelType.BYTES: base64.b64decode,
    LabelType.ANY: lambda x: x,
}

However, I'm not entirely sure if the boolean value conversion logic (lambda x: str(x).lower() == "true") is the most appropriate approach. I'd appreciate your thoughts on this implementation.

@s3rius
Copy link
Member

s3rius commented Dec 13, 2024

Thanks for finding it out and making a fix. Actually that case is quite interesting, because labels should always be strings during transmission and serialization. Can you please provide with more details of your setup?

Also, I guess this setup will work, because for strings the str will do nothing, but for actual booleans it will be actually doing everything correctly. Can you publish a PR? Or I can do it myself.

@ZhymabekRoman
Copy link
Contributor Author

I have been unable to replicate the problem when using a minimal code example. Could the issue be related to either the taskiq-litestar or taskiq-pipelines integration library?

ZhymabekRoman added a commit to ZhymabekRoman/taskiq that referenced this issue Dec 15, 2024
@s3rius
Copy link
Member

s3rius commented Dec 15, 2024

It actually should not.

@ZhymabekRoman
Copy link
Contributor Author

ZhymabekRoman commented Dec 16, 2024

I suppose problem might be somewhere here:

Custom retry middleware with task scheduling logic with delay execution
from datetime import datetime, timedelta
from logging import getLogger
from typing import Any

from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.exceptions import NoResultError
from taskiq.kicker import AsyncKicker
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult

logger = getLogger("taskiq.retry_middleware")


class SimpleRetryMiddleware(TaskiqMiddleware):
  """Middleware to add retries."""

  def __init__(
      self,
      default_retry_count: int = 3,
      default_retry_label: bool = False,
      no_result_on_retry: bool = True,
      retry_delay: int = 60,
  ) -> None:
      self.schedule_source = None
      self.default_retry_count = default_retry_count
      self.default_retry_label = default_retry_label
      self.no_result_on_retry = no_result_on_retry
      self.retry_delay = retry_delay

  def set_schedule_source(self, source: ScheduleSource) -> None:
      self.schedule_source = source

  async def on_error(
      self,
      message: "TaskiqMessage",
      result: "TaskiqResult[Any]",
      exception: BaseException,
  ) -> None:
      """Retry on error.

      This middleware is used to retry
      tasks on errors.

      If error is found during the execution
      this function is invoked.

      :param message: Message that caused the error.
      :param result: execution result.
      :param exception: found exception.
      """
      # Valid exception
      if isinstance(exception, NoResultError):
          return

      retry_on_error = message.labels.get("retry_on_error")
      if isinstance(retry_on_error, str):
          retry_on_error = retry_on_error.lower() == "true"

      retry_delay: int = int(message.labels.get("retry_delay", self.retry_delay))

      if retry_on_error is None:
          retry_on_error = self.default_retry_label
      # Check if retrying is enabled for the task.
      if not retry_on_error:
          return

      # Didn't workd out
      # schedule_label = datetime.datetime.utcnow() + datetime.timedelta(
      #     seconds=self.retry_timeout
      # )
      # message.labels["time"] = schedule_label

      kicker: AsyncKicker[Any, Any] = AsyncKicker(
          task_name=message.task_name,
          broker=self.broker,
          labels=message.labels,
      ).with_task_id(message.task_id)

      # Getting number of previous retries.
      retries = int(message.labels.get("_retries", 0)) + 1
      kicker.with_labels(_retries=retries)
      max_retries = int(message.labels.get("max_retries", self.default_retry_count))

      if retries < max_retries:
          schedule_time = datetime.now() + timedelta(seconds=retry_delay)
          logger.info(
              f"Task '{message.task_name}' invocation failed. Retrying in {retry_delay} seconds. Retry {retries}/{max_retries}, schedule time: {schedule_time}",  # noqa: E501
          )

          logger.info(f"Schedule source: {self.schedule_source}")
          if self.schedule_source is None:
              logger.error("Schedule source or retry delay is not set")
              await kicker.kiq(
                  *message.args,
                  **message.kwargs,
              )
              return

          await kicker.schedule_by_time(
              self.schedule_source,
              schedule_time,
              *message.args,
              **message.kwargs,
          )

          if self.no_result_on_retry:
              result.error = NoResultError()

      else:
          logger.warning(
              "Task '%s' invocation failed. Maximum retries count is reached.",
              message.task_name,
          )

I suppose the problem that I didn't use prepare_label somewhere here :/

Additional: taskiq broker initialize logic
from typing import Any

from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

# from taskiq_nats import NatsBroker
from taskiq_pipelines.middleware import PipelineMiddleware
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, RedisScheduleSource

from branched_call_bot.config import cnf
from branched_call_bot.services.taskiq import SimpleRetryMiddleware

# from .redis import async_redis_pool

redis_async_result: RedisAsyncResultBackend[Any] = RedisAsyncResultBackend(
    redis_url=cnf.REDIS.url,
)

retry_middleware = SimpleRetryMiddleware(
    default_retry_count=cnf.TASKIQ.RETRY_COUNT,
    retry_delay=cnf.TASKIQ.RETRY_DELAY,
)

pipeline_middleware = PipelineMiddleware()
# nats_broker = NatsBroker(
#         servers=cnf.NATS.SERVERS,
#         queue=cnf.NATS.QUEUE,
#     )

redis_broker = ListQueueBroker(cnf.REDIS.url)

taskiq_broker = redis_broker.with_middlewares(
    pipeline_middleware,
    retry_middleware,
).with_result_backend(redis_async_result)


label_scheduler_source = LabelScheduleSource(taskiq_broker)
redis_scheduler_source = RedisScheduleSource(url=cnf.REDIS.url)

retry_middleware.set_schedule_source(redis_scheduler_source)  # type: ignore

taskiq_scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[redis_scheduler_source, label_scheduler_source],
)

# taskiq_litestar.init(
#     broker=taskiq_broker,
#     app="branched_call_bot.clients.litestar:app",
# )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants