Django Channels:给 Django 加 WebSocket(不引入 Node)

起因

Django 项目要加"实时通知"功能(用户 like 时其他用户立刻看到 count 更新)。
两种思路:

  1. 起独立 Node.js WebSocket server + Django 通过 Redis pub/sub 协调
  2. Django Channels:在 Django 内部加 WebSocket / async

不想多维护一个 Node 服务 → Channels 直接。

Channels 是什么

Django 4.0+ 原生支持 ASGI,可以跑 async view。
Channels 是 Django Software Foundation 的官方扩展,处理:

  • WebSocket 协议(HTTP 升级)
  • 多 worker 间消息分发(channel layer,用 Redis)
  • SSE / 长连接

uv add channels channels-redis daphne

配置 ASGI

asgi.py

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.urls import path

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
django_asgi_app = get_asgi_application()

from chat.consumers import ChatConsumer
from notifications.consumers import NotifyConsumer

application = ProtocolTypeRouter({
    'http': django_asgi_app,           # 普通 HTTP 走原本 Django
    'websocket': AuthMiddlewareStack(    # WebSocket 路由
        URLRouter([
            path('ws/chat/<str:room>/', ChatConsumer.as_asgi()),
            path('ws/notify/', NotifyConsumer.as_asgi()),
        ])
    ),
})

settings.py

INSTALLED_APPS = [
    'daphne',     # 替代 staticfiles 的 ASGI server,必须排前面
    'django.contrib.staticfiles',
    'channels',
    # ... 你的 apps
]

ASGI_APPLICATION = 'myapp.asgi.application'

# channel layer:跨 worker 消息
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {'hosts': [('redis', 6379)]},
    },
}

写一个简单 Consumer

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room = self.scope['url_route']['kwargs']['room']
        self.group_name = f'chat_{self.room}'
        self.user = self.scope['user']

        if not self.user.is_authenticated:
            await self.close()
            return

        # 加入房间 group(Redis pub/sub)
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def receive(self, text_data):
        # 客户端发来消息
        data = json.loads(text_data)
        message = data['message']

        # 广播给房间所有人
        await self.channel_layer.group_send(
            self.group_name,
            {
                'type': 'chat.message',     # 调下面方法
                'message': message,
                'user': self.user.username,
            }
        )

    async def chat_message(self, event):
        # 房间收到消息时(包括自己发的)
        await self.send(text_data=json.dumps({
            'message': event['message'],
            'user': event['user'],
        }))

channel_layer.group_send 通过 Redis 通知所有 worker 的对应
consumer。3 个 daphne worker / 100 个客户端,消息正确广播。

客户端 JS

const ws = new WebSocket(`wss://example.com/ws/chat/${roomId}/`)

ws.onmessage = (e) => {
    const data = JSON.parse(e.data)
    addMessageToUI(data.user, data.message)
}

ws.onopen = () => {
    console.log('connected')
}

document.getElementById('send').addEventListener('click', () => {
    const text = document.getElementById('input').value
    ws.send(JSON.stringify({ message: text }))
})

# 开发用 runserver(自动支持 ASGI / Channels)
python manage.py runserver

# 生产用 daphne
daphne -b 0.0.0.0 -p 8000 myapp.asgi:application

# 多 worker
gunicorn -k uvicorn.workers.UvicornWorker myapp.asgi:application --workers 4

Redis 必须跑(channel layer 依赖)。

nginx 反代要支持 WebSocket:

location /ws/ {
    proxy_pass http://app;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_read_timeout 86400;     # 长连接不要超时
}

从 Django view 推 message

业务 view 里触发广播(非 WebSocket 路径):

# views.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

def like_post(request, post_id):
    post = Post.objects.get(pk=post_id)
    post.likes += 1
    post.save()

    # 通知所有连接到这帖子的客户端
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'post_{post_id}',
        {
            'type': 'post.update',     # 调用 consumer 的 post_update 方法
            'likes': post.likes,
        }
    )
    return JsonResponse({'likes': post.likes})
class PostNotifyConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        post_id = self.scope['url_route']['kwargs']['post_id']
        self.group_name = f'post_{post_id}'
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()

    async def post_update(self, event):
        await self.send(text_data=json.dumps({
            'likes': event['likes'],
        }))

通用模式:业务 view 改 DB → group_send 通知 → 所有 WebSocket 客户端
收到

ORM in async consumer

WebSocket consumer 是 async,但 Django ORM 默认同步。
sync_to_async

from asgiref.sync import sync_to_async

class MyConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        post = await sync_to_async(Post.objects.get)(pk=1)

或者用 Django 4.1+ 的 async ORM:

post = await Post.objects.aget(pk=1)

测试 Consumer

from channels.testing import WebsocketCommunicator
from chat.consumers import ChatConsumer

async def test_chat():
    communicator = WebsocketCommunicator(
        ChatConsumer.as_asgi(), '/ws/chat/test/')
    connected, _ = await communicator.connect()
    assert connected

    await communicator.send_json_to({'message': 'hello'})
    response = await communicator.receive_json_from()
    assert response['message'] == 'hello'

    await communicator.disconnect()

性能 / 规模

Channels + Redis 配置 4 worker 在小服务器上:

  • 1000 并发 WebSocket connection 轻松
  • 10000 取决于 Redis 配置 + 网络
  • 10w → 考虑专用 WebSocket server(Centrifugo / Phoenix Channels)

与替代品对比

Django Channels 单独 Node WebSocket Centrifugo Phoenix LiveView
语言 Python Node.js Go Elixir
集成 Django 原生 需 Redis 中介 N/A
并发上限 中(万级) 极高(百万) 极高
复杂度 高(双栈) 高(学语言)
适合 Django app + 适量 WebSocket 极致并发 + 现有 Node 中大规模 push 完全重 stack

真实部署 case

我们一个内部协作工具:

  • Django 4.2 + Channels 4
  • 500 实时在线协作 user
  • 用 1 台机器 (4 vCPU / 8 GB) + Redis
  • daphne 4 worker
  • nginx 反代
  • 6 个月 0 down

足够。如果上 5000 在线考虑 Centrifugo。

替代:HTMX + SSE

如果只是"服务端推" 不需要双向,用 SSE 替代(前面有篇):

@app.get('/sse/notifications')
async def sse():
    async def gen():
        async for event in get_events():
            yield f'data: {json.dumps(event)}\n\n'
    return StreamingHttpResponse(gen(), content_type='text/event-stream')

更简单 / 更标准 HTTP / Django 5 原生支持。
WebSocket 仍是双向场景(聊天 / 实时协作)的更优选。

踩过的坑

  1. daphne 没加进 INSTALLED_APPS:staticfiles 没替换 → runserver
    不识别 ASGI。

  2. channel layer 用 InMemory:单 worker OK;多 worker 时消息
    收不到(每 worker 独立 in-memory)。生产必用 RedisChannelLayer。

  3. WebSocket 鉴权:默认 scope['user'] 是 AnonymousUser。
    AuthMiddlewareStack 让 Django session cookie 生效。
    JWT 等其它 auth 要自己写 middleware。

  4. 客户端长时间不发消息断:默认 keepalive 没设,nginx / proxy
    一小时空闲 → 断。客户端定期发 ping:"" 空消息或者 protocol ping。

  5. async_to_sync + sync_to_async 混用 → ASGI ↔ WSGI 转换贵。
    尽量整个 path 同 async / 同 sync。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。