Skip to content
On this page

Middleware

中间件在@step中是一个可选的组件,它可以在任务执行前后做一些事情。

其中内置四个方法来hook任务执行中的行为:

  • before_send:消息发送之前
  • after_send:消息发送之后
  • before_consume:消费消息之前
  • after_consume:消费消息之后
python
class BaseMiddleware:

    def before_send(self, step, message, *args, **kwargs):
        """消息发送之前"""

    def after_send(self, step, message, *args, **kwargs):
        """消息发送之后"""

    def before_consume(self, step, message, *args, **kwargs):
        """消费消息之前"""

    def after_consume(self, step, message, *args, **kwargs):
        """消费消息之后"""

跳过后续中间件

当你想在中间件中判断某些条件,如果满足条件则跳过后续中间件,可以直接抛出StopMiddleware异常。

python
from onestep import BaseMiddleware, StopMiddleware

class MyMiddleware(BaseMiddleware):

    def before_consume(self, step, message, *args, **kwargs):
        raise StopMiddleware()

丢弃消息

当你想在中间件中判断某些条件,如果满足条件则丢弃消息,可以直接抛出DropMessage异常。

内置UniqueMiddleware就是这样的一个中间件,它会判断消息是否已经被消费过,如果已经被消费过则丢弃消息。

python
class UniqueMiddleware(BaseMiddleware):
    default_hash_func = hash_func()

    def before_consume(self, step, message, *args, **kwargs):
        message_hash = self.default_hash_func(message.body)
        if self.has_seen(message_hash):
            raise DropMessage(f"Message<{message}> has been seen before")

自定义中间件

⚠️继承关系

自定义中间件必须继承自BaseMiddleware

你可以在任务的流转过程中,自定义中间件来做一些事情。

python
from onestep import BaseMiddleware

class MyMiddleware(BaseMiddleware):

    def before_consume(self, step, message, *args, **kwargs):
        print("before consume")

    def after_consume(self, step, message, *args, **kwargs):
        print("after consume")

然后在@step中使用middlewares参数来指定中间件。

python
from onestep import step

@step(from_broker=..., middlewares=[MyMiddleware()])
def my_task(message):
    print("my task")

Released under the MIT License.