首版于
2025-02-21 20:55
保质期
新鲜度
0%
公司的支付系统提供了「同步支付」和「回调支付」两种模式。原来使用的是「同步支付」 ,在前端调起支付后,后端调用支付接口并阻塞,直到拿到支付系统的响应;如果超时或失败,则调起 celery 定时器对订单状态进行重试轮询。
现在用户量激增,为了避免阻塞后端,需要改为「回调支付」,也就是前端调起支付后,后端调用支付接口并立即返回,之后在回调中改写订单状态并通过 WebSocket 通知前端成功与否。在老大的介绍下,我选择了 Django Channels。
当心
这里指的是 channels
而非 django-channels
,后者早在十年前就已经停更了。
注意
如果只是简单接收 WebSocket 并响应,按照教程配置即可,下面配置的是后端主动通知前端的场景。
python -m pip install -U 'channels[daphne]'
对于旧版本的 Python:
python -m pip install -U channels
python -m pip install -U daphne
下面以 ws://localhost:8000/ws/order/{biz}/ 为例。
消费者 Consumer 是类似视图 View 一样的存在。
在 APP 下创建一个 consumers.py
文件,创建一个订单通知的异步消费者 OrderConsumer
。
当然啦,也可以直接放在 views.py
里。另外,
room_name
有字符要求,不过写错了会有报错提示。scope['url_route']
可以拿到 args
和 kwargs
,跟视图一样是来自路由的。send_data_to_frontend
可以随意改名,只要跟方法对得上就好。channel_layer
的 group_send()
data
可以随意改名,只要跟 send_data_to_frontend
的 event
格式对上就好。type
处理,比如把名字 send_data.to_frontend
处理成方法 send_data_to_frontend()
。import json
from channels.generic.websocket import AsyncWebsocketConsumer
class OrderConsumer(AsyncWebsocketConsumer):
room_name = ''
async def connect(self):
biz = self.scope['url_route']['kwargs']['biz']
self.room_name = f'order_{biz}'
await self.channel_layer.group_add(self.room_name, self.channel_name)
await self.accept()
async def disconnect(self, code):
await self.channel_layer.group_discard(self.room_name, self.channel_name)
async def receive(self, text_data=None, bytes_data=None):
await self.channel_layer.group_send(self.room_name, dict(
type='send_data_to_frontend',
data=text_data,
))
async def send_data_to_frontend(self, event):
await self.send(text_data=json.dumps(event['data'], ensure_ascii=False))
如果不使用异步,需要用 asgiref
库进行转换:
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
class OrderConsumer(WebsocketConsumer):
room_name = ''
def connect(self):
biz = self.scope['url_route']['kwargs']['biz']
self.room_name = f'order_{biz}'
async_to_sync(self.channel_layer.group_add)(self.room_name, self.channel_name)
self.accept()
def disconnect(self, code):
async_to_sync(self.channel_layer.group_discard)(self.room_name, self.channel_name)
def receive(self, text_data=None, bytes_data=None):
async_to_sync(self.channel_layer.group_send)(self.room_name, dict(
type='send_data_to_frontend',
data=text_data,
))
def send_data_to_frontend(self, event):
self.send(text_data=json.dumps(event['data'], ensure_ascii=False))
这一段也可以放在 ./your_project/urls.py
里,但是需要将 urlpatterns
改名为 websocket_urlpatterns
,不能跟普通的 HTTP 路由 urlpatterns
混在一起。
from django.urls import re_path
from your_app.consumers import OrderConsumer
urlpatterns = [
re_path(r"ws/order/(?P<biz>[a-z0-9]{32})/$", OrderConsumer.as_asgi()),
]
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
from your_app.routings import urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(URLRouter(urlpatterns))
),
})
以及
ASGI_APPLICATION = 'your_project.asgi.application'
INSTALLED_APPS = [
'daphne',
...
]
不配置的时候默认用内存进行存储,但生产环境中不建议。推荐用
pip install channels-redis
然后配置
CHANNEL_LAYERS = {
"default": dict(
BACKEND="channels_redis.core.RedisChannelLayer",
CONFIG={
"hosts": [("127.0.0.1", 6379)],
},
),
}
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from rest_framework.decorators import api_view
@api_view(['POST'])
def callback(request, biz: str):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'order_{order.biz}',
dict(
type='send_data_to_frontend',
data={ 'biz': biz, 'paid': True },
)
)