未分类
Python Django通过WebSSH操作Kubernetes Pod
通过Django调用k8s client来实现webssh登录: 涉及技术 Kubernetes Stream:接收数据执行,提供实时返回数据流 Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端 xterm.js:一个前端终端组件,用于模拟Terminal的界面显示 基本的数据流向是:用户 –> xterm.js –> django channels –> kubernetes stream,接下来看看具体的代码实现 用到的模块:
1 2 3 4 5 6 7 8 9 10 11 |
apscheduler==3.6.3 Django==2.2.13 channels==2.3.1 channels_redis==2.4.1 paramiko==2.7.1 django-redis==4.10.0 requests==2.22.0 GitPython==3.0.8 python-ldap==3.2.0 openpyxl==3.0.3 kubernetes==7.0.0 |
关键文件: k8s.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# -*- coding=utf-8 -*- """ :author sean """ import os import json import threading from kubernetes import client, config from kubernetes.stream import stream # from kubernetes.client import * # from kubernetes.client.rest import ApiException class KubernetesAPI(object): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): kub_conf = client.Configuration() kub_conf.host = api_host kub_conf.ssl_ca_cert = ssl_ca_cert kub_conf.cert_file = cert_file kub_conf.key_file = key_file self.api_client = client.ApiClient(configuration=kub_conf) self.client_core_v1 = client.CoreV1Api(api_client=self.api_client) self.client_apps_v1 = client.AppsV1Api(api_client=self.api_client) self.client_extensions_v1 = client.ExtensionsV1beta1Api( api_client=self.api_client) self.api_dict = {} def __getattr__(self, item): if item in self.api_dict: return self.api_dict[item] if hasattr(client, item) and callable(getattr(client, item)): self.api_dict[item] = getattr(client, item)( api_client=self.api_client) return self.api_dict[item] class K8SClient(KubernetesAPI): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): super(K8SClient, self).__init__( api_host, ssl_ca_cert, key_file, cert_file) def terminal_start(self, namespace, pod_name, container, rows, cols): command = [ "/bin/sh", "-c", 'TERM=xterm-256color; export TERM;' '([ -x /usr/bin/fish ] && exec /usr/bin/fish)' '|| ([ -x /bin/bash ] && ([ -x /usr/bin/script ] ' '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash))' '|| exec /bin/sh'] container_stream = stream( self.client_core_v1.connect_get_namespaced_pod_exec, name=pod_name, namespace=namespace, container=container, command=command, stderr=True, stdin=True, stdout=True, tty=True, _preload_content=False ) container_stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) return container_stream def get_pod_log(self, namespace, pod_name, container, tail_lines=50): """ 获取pod的日志 :param tail_lines: # 显示最后多少行 :return: """ log_stream = self.client_core_v1.read_namespaced_pod_log( name=pod_name, namespace=namespace, container=container, follow=True, pretty=True, _preload_content=False, timestamps=True, tail_lines=tail_lines ).stream() return log_stream class K8SStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): while self.stream.is_open(): try: self.stream.update(timeout=1) if self.stream.peek_stdout(): stdout = self.stream.read_stdout() # print('stream stdout--->',stdout) if stdout: self.websocket.send(bytes_data=stdout.encode(encoding = "utf-8")) elif self.stream.peek_stderr(): stderr = self.stream.read_stderr() # print('stream stderr--->',stderr) self.websocket.send(bytes_data=stderr.encode()) except: self.websocket.close() else: self.websocket.close() class K8SLogStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SLogStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): for s in self.stream: if s: self.websocket.send(bytes_data=s) else: self.websocket.close() |
consumer.py:
|
from channels.generic.websocket import WebsocketConsumer from django_redis import get_redis_connection from apps.host.models import Host from threading import Thread import json from libs.k8s import K8SClient, K8SStreamThread, K8SLogStreamThread from django.conf import settings from urllib.parse import parse_qs class ExecConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.token = self.scope['url_route']['kwargs']['token'] self.rds = get_redis_connection() def connect(self): self.accept() def disconnect(self, code): self.rds.close() def get_response(self): response = self.rds.brpop(self.token, timeout=5) return response[1] if response else None def receive(self, **kwargs): response = self.get_response() while response: data = response.decode() self.send(text_data=data) response = self.get_response() self.send(text_data='pong') class SSHConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.user = self.scope['user'] self.id = self.scope['url_route']['kwargs']['id'] self.chan = None self.ssh = None def loop_read(self): while True: data = self.chan.recv(32 * 1024) # print('read: {!r}'.format(data)) if not data: self.close(3333) break self.send(bytes_data=data) def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2: self.chan.resize_pty(*resize) else: self.chan.send(data['data']) def disconnect(self, code): self.chan.close() self.ssh.close() # print('Connection close') def connect(self): if self.user.has_host_perm(self.id): self.accept() self._init() else: self.close() def _init(self): self.send(bytes_data=b'Connecting ...\r\n') host = Host.objects.filter(pk=self.id).first() if not host: self.send(text_data='Unknown host\r\n') self.close() try: self.ssh = host.get_ssh().get_client() except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return self.chan = self.ssh.invoke_shell(term='xterm') self.chan.transport.set_keepalive(30) Thread(target=self.loop_read).start() class TerminalConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] query_string = self.scope['query_string'].decode() self.rows = parse_qs(query_string).get('rows', [''])[0] self.cols = parse_qs(query_string).get('cols', [''])[0] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None def connect(self): self.accept() self._init() def disconnect(self, close_code): self.stream.write_stdin('exit\r') self.stream.close() def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2 and isinstance(resize, list): rows = resize[0] cols = resize[1] self.stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) else: self.stream.write_stdin(data['data']) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.terminal_start(self.namespace, self.pod, self.container, self.rows, self.cols) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SStreamThread(self, self.stream).start() class LogConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None # self.tail_lines = 300 def connect(self): self.accept() self._init() def disconnect(self, close_code): try: # self.stream.write_stdin('exit\r') self.stream.close() except Exception as e: pass def send_message(self, data): data = text_data or bytes_data if data: self.stream.send(data) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.get_pod_log(self.namespace, self.pod, self.container) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SLogStreamThread(self, self.stream).start() |
出现问题: 窗口自适应的问题: cat有中文字符的文件,会卡住,报错:UnicodeDecodeError: ‘utf-8’ codec can’t decode bytes in position… 原因:是k8s client python sdk的一个bug, 解决方式是修改/root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py, 修改第179行data = data.decode(“utf-8”)为:data = data.decode(“utf-8”, “replace”) 参考: https://github.com/kubernetes-client/python-base/issues/88 https://github.com/kubernetes-client/python-base/commit/15474efbaf906bf557c4a38392a0b06c95ce7841 打开webterminal页面后,容器所在机器的CPU负载会飙升,改成async异步也解决不了,官方sdk有个方法: /root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py: Read more…