通过Django调用k8s client来实现webssh登录:
涉及技术
- Kubernetes Stream:接收数据执行,提供实时返回数据流
- Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端
- xterm.js:一个前端终端组件,用于模拟Terminal的界面显示
基本的数据流向是:用户 –> xterm.js –> django channels –> kubernetes stream,接下来看看具体的代码实现
用到的模块:
1 2 3 4 5 6 7 8 9 10 11 |
apscheduler==3.6.3 Django==2.2.13 channels==2.3.1 channels_redis==2.4.1 paramiko==2.7.1 django-redis==4.10.0 requests==2.22.0 GitPython==3.0.8 python-ldap==3.2.0 openpyxl==3.0.3 kubernetes==7.0.0 |
关键文件:
k8s.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# -*- coding=utf-8 -*- """ :author sean """ import os import json import threading from kubernetes import client, config from kubernetes.stream import stream # from kubernetes.client import * # from kubernetes.client.rest import ApiException class KubernetesAPI(object): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): kub_conf = client.Configuration() kub_conf.host = api_host kub_conf.ssl_ca_cert = ssl_ca_cert kub_conf.cert_file = cert_file kub_conf.key_file = key_file self.api_client = client.ApiClient(configuration=kub_conf) self.client_core_v1 = client.CoreV1Api(api_client=self.api_client) self.client_apps_v1 = client.AppsV1Api(api_client=self.api_client) self.client_extensions_v1 = client.ExtensionsV1beta1Api( api_client=self.api_client) self.api_dict = {} def __getattr__(self, item): if item in self.api_dict: return self.api_dict[item] if hasattr(client, item) and callable(getattr(client, item)): self.api_dict[item] = getattr(client, item)( api_client=self.api_client) return self.api_dict[item] class K8SClient(KubernetesAPI): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): super(K8SClient, self).__init__( api_host, ssl_ca_cert, key_file, cert_file) def terminal_start(self, namespace, pod_name, container, rows, cols): command = [ "/bin/sh", "-c", 'TERM=xterm-256color; export TERM;' '([ -x /usr/bin/fish ] && exec /usr/bin/fish)' '|| ([ -x /bin/bash ] && ([ -x /usr/bin/script ] ' '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash))' '|| exec /bin/sh'] container_stream = stream( self.client_core_v1.connect_get_namespaced_pod_exec, name=pod_name, namespace=namespace, container=container, command=command, stderr=True, stdin=True, stdout=True, tty=True, _preload_content=False ) container_stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) return container_stream def get_pod_log(self, namespace, pod_name, container, tail_lines=50): """ 获取pod的日志 :param tail_lines: # 显示最后多少行 :return: """ log_stream = self.client_core_v1.read_namespaced_pod_log( name=pod_name, namespace=namespace, container=container, follow=True, pretty=True, _preload_content=False, timestamps=True, tail_lines=tail_lines ).stream() return log_stream class K8SStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): while self.stream.is_open(): try: self.stream.update(timeout=1) if self.stream.peek_stdout(): stdout = self.stream.read_stdout() # print('stream stdout--->',stdout) if stdout: self.websocket.send(bytes_data=stdout.encode(encoding = "utf-8")) elif self.stream.peek_stderr(): stderr = self.stream.read_stderr() # print('stream stderr--->',stderr) self.websocket.send(bytes_data=stderr.encode()) except: self.websocket.close() else: self.websocket.close() class K8SLogStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SLogStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): for s in self.stream: if s: self.websocket.send(bytes_data=s) else: self.websocket.close() |
consumer.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
from channels.generic.websocket import WebsocketConsumer from django_redis import get_redis_connection from apps.host.models import Host from threading import Thread import json from libs.k8s import K8SClient, K8SStreamThread, K8SLogStreamThread from django.conf import settings from urllib.parse import parse_qs class ExecConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.token = self.scope['url_route']['kwargs']['token'] self.rds = get_redis_connection() def connect(self): self.accept() def disconnect(self, code): self.rds.close() def get_response(self): response = self.rds.brpop(self.token, timeout=5) return response[1] if response else None def receive(self, **kwargs): response = self.get_response() while response: data = response.decode() self.send(text_data=data) response = self.get_response() self.send(text_data='pong') class SSHConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.user = self.scope['user'] self.id = self.scope['url_route']['kwargs']['id'] self.chan = None self.ssh = None def loop_read(self): while True: data = self.chan.recv(32 * 1024) # print('read: {!r}'.format(data)) if not data: self.close(3333) break self.send(bytes_data=data) def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2: self.chan.resize_pty(*resize) else: self.chan.send(data['data']) def disconnect(self, code): self.chan.close() self.ssh.close() # print('Connection close') def connect(self): if self.user.has_host_perm(self.id): self.accept() self._init() else: self.close() def _init(self): self.send(bytes_data=b'Connecting ...\r\n') host = Host.objects.filter(pk=self.id).first() if not host: self.send(text_data='Unknown host\r\n') self.close() try: self.ssh = host.get_ssh().get_client() except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return self.chan = self.ssh.invoke_shell(term='xterm') self.chan.transport.set_keepalive(30) Thread(target=self.loop_read).start() class TerminalConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] query_string = self.scope['query_string'].decode() self.rows = parse_qs(query_string).get('rows', [''])[0] self.cols = parse_qs(query_string).get('cols', [''])[0] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None def connect(self): self.accept() self._init() def disconnect(self, close_code): self.stream.write_stdin('exit\r') self.stream.close() def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2 and isinstance(resize, list): rows = resize[0] cols = resize[1] self.stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) else: self.stream.write_stdin(data['data']) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.terminal_start(self.namespace, self.pod, self.container, self.rows, self.cols) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SStreamThread(self, self.stream).start() class LogConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None # self.tail_lines = 300 def connect(self): self.accept() self._init() def disconnect(self, close_code): try: # self.stream.write_stdin('exit\r') self.stream.close() except Exception as e: pass def send_message(self, data): data = text_data or bytes_data if data: self.stream.send(data) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.get_pod_log(self.namespace, self.pod, self.container) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SLogStreamThread(self, self.stream).start() |
出现问题:
- 窗口自适应的问题:
- cat有中文字符的文件,会卡住,报错:UnicodeDecodeError: ‘utf-8’ codec can’t decode bytes in position…
- 原因:是k8s client python sdk的一个bug, 解决方式是修改/root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py, 修改第179行data = data.decode(“utf-8”)为:data = data.decode(“utf-8”, “replace”)
- 参考:
- https://github.com/kubernetes-client/python-base/issues/88
- https://github.com/kubernetes-client/python-base/commit/15474efbaf906bf557c4a38392a0b06c95ce7841
- 原因:是k8s client python sdk的一个bug, 解决方式是修改/root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py, 修改第179行data = data.decode(“utf-8”)为:data = data.decode(“utf-8”, “replace”)
- 打开webterminal页面后,容器所在机器的CPU负载会飙升,改成async异步也解决不了,官方sdk有个方法:
/root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py:
-
-
-
123456789101112131415161718192021222324252627282930def update(self, timeout=0):"""Update channel buffers with at most one complete frame of input."""if not self.is_open():returnif not self.sock.connected:self._connected = Falsereturnr, _, _ = select.select((self.sock.sock, ), (), (), timeout)if r:op_code, frame = self.sock.recv_data_frame(True)if op_code == ABNF.OPCODE_CLOSE:self._connected = Falsereturnelif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:data = frame.dataif six.PY3:data = data.decode("utf-8", "replace")if len(data) > 1:channel = ord(data[0])data = data[1:]if data:if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:# keeping all messages in the order they received# for non-blocking call.self._all.write(data)if channel not in self._channels:self._channels[channel] = dataelse:self._channels[channel] += data
-
- 解决: 在方法K8SStreamThread 里加上self.stream.update(timeout=1),当有最多一个完整的输入帧时更新通道缓冲区
- 参考: https://github.com/kubernetes-client/python/tree/release-12.0/examples
-
websocket原理:
Django WebSocket (一):WebSocket 概念及原理
WebSocket
基本概念
- WebSocket允许服务端主动向客户端推送数据
- 在WebSocket协议中
- 客户端浏览器和服务器只需要一次握手就可以创建持久性的连接
- 并在浏览器和服务器之间进行双向的数据传输
- HTTP、WebSocket
- 都是建立在TCP连接基础之上
- websocket协议是通过http协议来建立传输层的TCP连接

请求头
Connection
、Upgrade
:表示客户端发起的是WebSocket请求Sec-WebSocket-Version
:客户端所使用的的WebSocket协议版本号,服务端会确认是否支持该版本号Sec-WebSocket-Key
:一个Base64编码值,有浏览器随机生成,用于升级requestSec-WebSocket-Extensions
:客户端想要表达的协议级的扩展
响应头
HTTP/1.1 101 Switching Protocols
:切换协议,WebSocket协议通过HTTP协议来建立运输层的TCP连接Connection
和Upgrade
:表示服务端返回的是WebSocket响应
连接过程
目的:用户能够实时接受信息
JavaScript轮询
- 客户端发送GET,请求,服务器返回页面数据
- 客户端不停地发送请求,询问服务器是否有数据返回,服务器一直响应请求
- 这种方式,
服务器会浪费很多资源

WebSocket连接
- 1 .客户端通过HTTP协议发送GET请求,携带
Connection
和Upgrade
,指定要将HTTP协议升级成WebSocket协议 - 2 .服务器给客户端返回
HTTP/1.1 101 Switching Protocols
,表示协议切换成功 - 3 .服务端与客户端进行通信

优缺点及应用场景
- 优点:
- 支持双向通信,实时性更强
- 数据格式比较轻量,性能开销小,通信高效
- 支持扩展。用户可以扩展协议或者实现自定义的自协议(比如支持自定义压缩算法)
- 缺点
- 少部分浏览器不支持,浏览器支持的程度和方式有区别
- 长连接对后端处理业务的代码稳定性要求更高,后端推送功能相对复杂
- 成熟的HTTP生态下有大量的组件可以使用,WebSocket较少
- 应用场景
- 及时聊天通信,网站消息通知
- 在线协同编辑,如腾讯文档
- 多玩家在线游戏,视屏弹幕、股票基金实时报价
Django 使用 WebSocket
- 需要解决的问题
- 如何分辨路由(HTTP请求、WebSocket请求)
- 如何兼容Django的认证系统
- 如何接受和推送WebSocket消息
- 如何通过ORM保存和获取数据
- 由以上问题便引出我们的Django Channels
Django Channels
Channels是一个为Django提供异步扩展的库,通常主要用来提供WebSocket支持和后台任务。
网络架构
Protocol Type Router
:协议类型解析器,对不同类型的协议进行解析
处理请求
Django 请求
- Django是一个同步框架:接收到请求之后,必须处理完这个请求,返回了之后,浏览器才能显示加载完毕
- 业务场景:Django处理一个时间很长的请求
- 面临的问题
- 客户端浏览器会一直等待
- Nginx会超时(TimeOut),关闭连接
- 可以考虑的方案
- 使用celery,将比较耗时的任务传递给celery,进行异步处理;Django正常返回
- 问题:
celery
执行是否成功/执行成功之后的结果,都无法再通过Django主动发送给前端
- 面临的问题
Django Channels
interface server
:接口服务器,负责对协议进行解析,将不同协议分发到不同的ChannelChannel Leyer
:频道层,可以是一个FIFO(先进先出,first in;first out)队列,通常使用Redisconsumer
:消费者,接受和处理消息
Channels中文件和配置的含义
- 配置步骤
- 在
INSTALLED_APPS
中注册Channels应用
- 在
在settings.py的配置文件中,添加CHANNEL_LAYERS
缓存
1 2 3 4 5 6 |
from channels.layers import get_channel_layer channel_layer = get_channel_layer() # ChannelLayerManager() 频道层管理器实例 ==> 相当于ORM中查询集的管理器QuerySet # 频道层的API channel_layer.group_add("第一个参数是组名", "第二个参数是频道的名字") channel_layer.group_discard() # 离开某一个组 |
- asgi.py:介于网络协议服务和Python应用之间的标准接口,能够处理多重通用类型协议,包括HTTP、HTTP2和WebSocket
- channel_layers:在settings.py中配置,类似于一个通道,发送者(producer)在一段发送消息,消费者(consumer)在另一端监听
- routungs.py:相当于Django中的urls.py
- consumers.py:相当于Django中的views.py
- views.py: 用来开发
符合WSGI
规范的应用 - consumers.py: 用来开发
符合ASGI
接口规范的Python应用
- views.py: 用来开发
WSGI 和 ASGI
- WSGI:
- 为Python语言定义的Web服务器和Web应用程序或框架之前的一种简单而通用的接口
- uWSGI:一个Web服务器, 可安装的软件,提供服务的
- WSGI:一套标准,(HTTP,HTTPS)
- ASGI:
- 异步服务网关接口,一个介于网络协议器和Python之间的标准接口
- 能够处理多种通用的协议类型,包括HTTP、HTTPS和WebSocket
- 部署:
- HTTP、HTTP2:Nginx/Apache + WSGI(uWSGI) + Django/Flask/Python3
- HTTP、HTTP2、WebSocket:Nginx/Apache + ASGI)(Daphne)+ Django/Flask/Python3
- 区别
- WSGI:基于HTTP协议模式的,不支持WebSocket
- ASGI:就是为了支持Python常用的WSGI所不支持的新的协议标准
- 即:
ASGI是WSGI的扩展,而且能通过asyncio异步运行
Consumer的使用
- event loop:事件循环
- event handler:事件处理
- sync:同步
- async:异步
scope:在ASGI接口规范中定义了,相当于WSGI中的request
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
from channels.consumer import SyncConsumer class EchoConsumer(SyncConsumer): """同步的Consumer""" def websocket_connect(self, event): """ 建立连接 event: 连接的事件 """ self.send({ # 这里的这个字典的key是固定的 # 这里写的是一个字符串,但是对应的是websocket.accept() 这个方法 # -->接收websocket的连接 "type": "websocket.accept", }) def websocket_receive(self, event): """ 接受消息 event:接受的事件 """ self.send({ # 这里写的是一个字符串,但是对应的是websocket.send() 这个方法 # --> 从后端主动发送websocket的消息 "type": "websocket.send", "text": event["text"] # 后端返回给前端的数据 }) |
ORM 同步到异步
什么时候用sync什么时候用async 异步的代码里面不要写入同步的逻辑,否则起不到真正意义上的异步并发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class EchoAsyncConsumer(AsyncConsumer): async def websocket_receive(self, event): # 如果在异步的逻辑里面,出现同步的代码 --> 当前方法的事件循环会卡住,这时候就要把同步代码改成异步代码 # ORM同步到异步 user = User.objects.get(username=username) from channels.db import database_sync_to_async # 方式一: # user = await database_sync_to_async(user = User.objects.get(username=username)) # 方式二: @database_sync_to_async def get_user(username): return user = User.objects.get(username=username) |
socp
在ASGI接口规范中定义了,相当于WSGI中的request
1 2 3 4 |
user = self.scope["user"] path = self.scope["path"] # Request请求的路径==> HTTP/WebSocket header = self.scope["headers"] method = self.scope["method"] # 注意:这个只针对于HTTP请求 |
Generic Consumer同步与异步通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer class MyConsumer(WebsocketConsumer): """WebsocketConsumer:对SyncConsumer的进一步封装""" def connect(self): """同步,接收连接""" # self.accept() 如果不传参,表示接受websocket的连接 # self.accept() 传参,subprotocol==>自定义的子协议 self.accept(subprotocol="you protocol") # 拒绝连接,给客户端发送一个状态码403,表示权限错误 self.close(code=403) def receive(self, text_data=None, bytes_data=None): """接收数据""" self.send(text_data="") # 返回文本 self.send(bytes_data="") # 把字符串转换成二进制的帧返回 self.close() def disconnect(self, code): """断开连接""" pass class MyAsyncConsumer(AsyncWebsocketConsumer): """ 把上面同步的Consumer变成异步的Consumer 步骤:def()前面添加async,函数内部调用的方法前面添加await """ async def connect(self): """同步,接收连接""" await self.accept(subprotocol="you protocol") await self.close(code=403) async def receive(self, text_data=None, bytes_data=None): """接收数据""" await self.send(text_data="") await self.send(bytes_data="") await self.close() async def disconnect(self, code): """断开连接""" pass |
Channels的路由Routing开发
- ProtocolTypeRouter:协议类型解析
- self.scope[‘type’]获取协议类型
- self.scope[‘url_route’][‘kwargs’][‘username’]获取url中关键字参数
channels routing是scope级别的,一个连接只能由一个consumer接收和处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from channels.security.websocket import AllowedHostsOriginValidator from django.urls import path from zanhu.messager.consumers import MessagesConsumer application = ProtocolTypeRouter({ # 'http': # 普通的HTTP请求不需要我们手动在这里添加,框架会自动加载 'websocket': # 使用AllowedHostsOriginValidator,允许的访问的源站与settings.py文件中的ALLOWED_HOSTS相同 AllowedHostsOriginValidator( # 认证中间件站(兼容Django认证系统): # 用于WebSocket认证,集成了CookieMiddleware, SessionMiddleware, AuthMiddleware AuthMiddlewareStack( # URL路由 URLRouter([ # URL路由匹配 path('ws/notifications/', NotificationsConsumer), path('ws/<str:username>/', MessagesConsumer), ]) ) ) }) |
- OriginValidator、AllowedHostsOriginValidator可以防止通过WebSocket进行CSRF攻击
OriginValidator需要手动添加允许访问的源站,如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from channels.security.websocket import OriginValidator application = ProtocolTypeRouter({ 'websocket': OriginValidator( AuthMiddlewareStack( URLRouter([ ... ]) ), # 第二个参数,手动添加的允许访问的源站 [".imooc.com", "http://.imooc.com:80", "http://muke.site.com"] ) }) |
安装:pipenv install channels-redis
后端开发
注册应用
1 2 3 4 5 6 |
INSTALLED_APPS = ( 'django.contrib.auth', 'django.contrib.contenttypes', ... 'channels', # 添加channels ) |
ASGI配置
asgi.py
配置asgi的时候,需要结合当前文件的目录结构,==>
wsgi.py同级目录
1 2 3 4 5 6 7 8 9 10 11 12 |
import os import sys import django from channels.routing import get_default_application # application加入查找路径中 app_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir)) sys.path.append(os.path.join(app_path, 'zanhu')) # ../project/project,应用的路径 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.production") django.setup() application = get_default_application() |
setting配置
在settings.py中添加
1 2 3 |
... ASGI_APPLICATION = "myproject.asgi.application" ... |
Consumer配置
Channels的原理中说了:consumer.py相当于Django中的view.py 用来处理请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import json from channels.generic.websocket import AsyncWebsocketConsumer class MessagesConsumer(AsyncWebsocketConsumer): """处理私信应用中WebSocket请求""" # 定义方法的地方,前面添加 async # 调用方法的地方,前面添加 await async def connect(self): """WebSocket连接""" # 校验用户是否是合法用户 if self.scope['user'].is_anonymous: # is_anonymous # 未登录的用户拒绝连接 await self.close() else: # 加入聊天组,监听频道 # channel_layer ==> get_channel_layer() ==> ChannelLayerManager() 频道层管理器实例 # 每两个私信的人建立一个聊天组 # group_add("第一个参数是组名", "第二个参数是频道的名字") # 频道名字,使用默认就可以:"%s.%s!%s" % (prefix, self.client_prefix, uuid.uuid4().hex,) await self.channel_layer.group_add(self.scope['user'].username, self.channel_name) # 接受WebSocket连接 await self.accept() async def receive(self, text_data=None, bytes_data=None): """接收私信""" await self.send(text_data=json.dumps(text_data)) async def disconnect(self, code): """离开聊天组""" # 把当前用户从当前监听的频道组里面移除 await self.channel_layer.group_discard(self.scope['user'].username, self.channel_name) |
Routing配置
- 这里就相当于Django中的urls.py
- 主要用来匹配不同的请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from channels.auth from django.urls import pathimport AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from channels.security.websocket import AllowedHostsOriginValidator from proejct.messager.consumers import MessagesConsumer from proejct.notifications.consumers import NotificationsConsumer application = ProtocolTypeRouter({ 'websocket': AllowedHostsOriginValidator( AuthMiddlewareStack( URLRouter([ path('ws/<str:username>/', MessagesConsumer), ]) ) ) }) |
前端代码
WebSocket基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
// WebSocket构造函数,用于新建WebSocket实例 var ws = new WebSocket('websocket地址', '请求类型'); ws.readyState:返回实例对象当前的状态 CONNNECTING: 值为0, 表示正在连接 OPEN: 值为1, 表示连接成功, 可以通信了 CLOSING: 值为2, 表示连接正在关闭 CLOSED: 值为3, 表示连接已关闭, 或者打开连接失败 // ws.readyState:实例对象的使用 switch (ws.readyState) { case ws.CONNECTING: // break; case ws.OPEN: // break; case ws.CLOSING: // break; case ws.CLOSED: // break; default: // ... break; } // ws.onopen 用于指定连接成功后的回调函数 ws.onopen = function () { ws.send('连接成功!') }; // ws.onclose 用于指定连接关闭后的回调函数 ws.onclose = function () { ws.send('连接关闭!') }; // ws.onmessage 用于指定收到服务器数据后的回调函数 ws.onmessage = function (event) { if (typeof event.data === String) { console.log("received string") } else { console.log("xxx") } }; // ws.send() // 发送数据内容 // ws.onerror 指定报错时的回调函数 ws.onerror = function (event) { // 报错处理 }; |
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
$(function () { // 滚动条下拉到底 function scrollConversationScreen() { $("input[name='message']").focus(); $('.messages-list').scrollTop($('.messages-list')[0].scrollHeight); } // AJAX POST发送消息 $("#send").submit(function () { $.ajax({ url: '/messages/send-message/', data: $("#send").serialize(), cache: false, type: 'POST', success: function (data) { $(".send-message").before(data); // 将接收到的消息插入到聊天框 $("input[name='message']").val(''); // 消息发送框置为空 scrollConversationScreen(); // 滚动条下拉到底 } }); return false; }); // const:固定常量 // "https:" ? "wss" : "ws" ==> JS中的三元运算符 // WebSocket连接,使用wss(https)或者ws(http) const ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"; const ws_path = ws_scheme + "://" + window.location.host + "/ws/" + currentUser + "/"; const ws = new ReconnectingWebSocket(ws_path); // 监听后端发送过来的消息 // event 监听事件 ws.onmessage = function (event) { const data = JSON.parse(event.data); if (data.sender === activeUser) { // 发送者为当前选中的用户 $(".send-message").before(data.message); // 将接收到的消息插入到聊天框 scrollConversationScreen(); // 滚动条下拉到底 } } }); |
参考: https://zhengxingtao.com/article/125/
更多知识:
https://segmentfault.com/a/1190000023402628
https://www.neerajbyte.com/post/how-to-implement-websocket-in-django-using-channels-and-stream-websocket-data
https://segmentfault.com/a/1190000023402628
0 Comments