未分类
Python Django通过WebSSH操作Kubernetes Pod
通过Django调用k8s client来实现webssh登录: 涉及技术 Kubernetes Stream:接收数据执行,提供实时返回数据流 Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端 xterm.js:一个前端终端组件,用于模拟Terminal的界面显示 基本的数据流向是:用户 –> xterm.js –> django channels –> kubernetes stream,接下来看看具体的代码实现 用到的模块:
1 2 3 4 5 6 7 8 9 10 11 |
apscheduler==3.6.3 Django==2.2.13 channels==2.3.1 channels_redis==2.4.1 paramiko==2.7.1 django-redis==4.10.0 requests==2.22.0 GitPython==3.0.8 python-ldap==3.2.0 openpyxl==3.0.3 kubernetes==7.0.0 |
关键文件: k8s.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# -*- coding=utf-8 -*- """ :author sean """ import os import json import threading from kubernetes import client, config from kubernetes.stream import stream # from kubernetes.client import * # from kubernetes.client.rest import ApiException class KubernetesAPI(object): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): kub_conf = client.Configuration() kub_conf.host = api_host kub_conf.ssl_ca_cert = ssl_ca_cert kub_conf.cert_file = cert_file kub_conf.key_file = key_file self.api_client = client.ApiClient(configuration=kub_conf) self.client_core_v1 = client.CoreV1Api(api_client=self.api_client) self.client_apps_v1 = client.AppsV1Api(api_client=self.api_client) self.client_extensions_v1 = client.ExtensionsV1beta1Api( api_client=self.api_client) self.api_dict = {} def __getattr__(self, item): if item in self.api_dict: return self.api_dict[item] if hasattr(client, item) and callable(getattr(client, item)): self.api_dict[item] = getattr(client, item)( api_client=self.api_client) return self.api_dict[item] class K8SClient(KubernetesAPI): def __init__(self, api_host, ssl_ca_cert, key_file, cert_file): super(K8SClient, self).__init__( api_host, ssl_ca_cert, key_file, cert_file) def terminal_start(self, namespace, pod_name, container, rows, cols): command = [ "/bin/sh", "-c", 'TERM=xterm-256color; export TERM;' '([ -x /usr/bin/fish ] && exec /usr/bin/fish)' '|| ([ -x /bin/bash ] && ([ -x /usr/bin/script ] ' '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash))' '|| exec /bin/sh'] container_stream = stream( self.client_core_v1.connect_get_namespaced_pod_exec, name=pod_name, namespace=namespace, container=container, command=command, stderr=True, stdin=True, stdout=True, tty=True, _preload_content=False ) container_stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) return container_stream def get_pod_log(self, namespace, pod_name, container, tail_lines=50): """ 获取pod的日志 :param tail_lines: # 显示最后多少行 :return: """ log_stream = self.client_core_v1.read_namespaced_pod_log( name=pod_name, namespace=namespace, container=container, follow=True, pretty=True, _preload_content=False, timestamps=True, tail_lines=tail_lines ).stream() return log_stream class K8SStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): while self.stream.is_open(): try: self.stream.update(timeout=1) if self.stream.peek_stdout(): stdout = self.stream.read_stdout() # print('stream stdout--->',stdout) if stdout: self.websocket.send(bytes_data=stdout.encode(encoding = "utf-8")) elif self.stream.peek_stderr(): stderr = self.stream.read_stderr() # print('stream stderr--->',stderr) self.websocket.send(bytes_data=stderr.encode()) except: self.websocket.close() else: self.websocket.close() class K8SLogStreamThread(threading.Thread): def __init__(self, websocket, container_stream): super(K8SLogStreamThread, self).__init__() self.websocket = websocket self.stream = container_stream def run(self): for s in self.stream: if s: self.websocket.send(bytes_data=s) else: self.websocket.close() |
consumer.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
from channels.generic.websocket import WebsocketConsumer from django_redis import get_redis_connection from apps.host.models import Host from threading import Thread import json from libs.k8s import K8SClient, K8SStreamThread, K8SLogStreamThread from django.conf import settings from urllib.parse import parse_qs class ExecConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.token = self.scope['url_route']['kwargs']['token'] self.rds = get_redis_connection() def connect(self): self.accept() def disconnect(self, code): self.rds.close() def get_response(self): response = self.rds.brpop(self.token, timeout=5) return response[1] if response else None def receive(self, **kwargs): response = self.get_response() while response: data = response.decode() self.send(text_data=data) response = self.get_response() self.send(text_data='pong') class SSHConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.user = self.scope['user'] self.id = self.scope['url_route']['kwargs']['id'] self.chan = None self.ssh = None def loop_read(self): while True: data = self.chan.recv(32 * 1024) # print('read: {!r}'.format(data)) if not data: self.close(3333) break self.send(bytes_data=data) def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2: self.chan.resize_pty(*resize) else: self.chan.send(data['data']) def disconnect(self, code): self.chan.close() self.ssh.close() # print('Connection close') def connect(self): if self.user.has_host_perm(self.id): self.accept() self._init() else: self.close() def _init(self): self.send(bytes_data=b'Connecting ...\r\n') host = Host.objects.filter(pk=self.id).first() if not host: self.send(text_data='Unknown host\r\n') self.close() try: self.ssh = host.get_ssh().get_client() except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return self.chan = self.ssh.invoke_shell(term='xterm') self.chan.transport.set_keepalive(30) Thread(target=self.loop_read).start() class TerminalConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] query_string = self.scope['query_string'].decode() self.rows = parse_qs(query_string).get('rows', [''])[0] self.cols = parse_qs(query_string).get('cols', [''])[0] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None def connect(self): self.accept() self._init() def disconnect(self, close_code): self.stream.write_stdin('exit\r') self.stream.close() def receive(self, text_data=None, bytes_data=None): data = text_data or bytes_data if data: data = json.loads(data) # print('write: {!r}'.format(data)) resize = data.get('resize') if resize and len(resize) == 2 and isinstance(resize, list): rows = resize[0] cols = resize[1] self.stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)})) else: self.stream.write_stdin(data['data']) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.terminal_start(self.namespace, self.pod, self.container, self.rows, self.cols) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SStreamThread(self, self.stream).start() class LogConsumer(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # self.user = self.scope['user'] self.namespace = self.scope['url_route']['kwargs']['namespace'] self.pod = self.scope['url_route']['kwargs']['pod'] self.container = self.scope['url_route']['kwargs']['container'] self.kub = K8SClient( api_host = settings.K8S_API_HOST, ssl_ca_cert = settings.K8S_CA_CERT, key_file = settings.K8S_KEY_FILE, cert_file = settings.K8S_CERT_FILE ) self.stream = None # self.tail_lines = 300 def connect(self): self.accept() self._init() def disconnect(self, close_code): try: # self.stream.write_stdin('exit\r') self.stream.close() except Exception as e: pass def send_message(self, data): data = text_data or bytes_data if data: self.stream.send(data) def _init(self): self.send(bytes_data=b'Connecting ...\r\n') try: self.stream = self.kub.get_pod_log(self.namespace, self.pod, self.container) except Exception as e: self.send(bytes_data=f'Exception: {e}\r\n'.encode()) self.close() return K8SLogStreamThread(self, self.stream).start() |
出现问题: 窗口自适应的问题: cat有中文字符的文件,会卡住,报错:UnicodeDecodeError: ‘utf-8’ codec can’t decode bytes in position… 原因:是k8s client python sdk的一个bug, 解决方式是修改/root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py, 修改第179行data = data.decode(“utf-8”)为:data = data.decode(“utf-8”, “replace”) 参考: https://github.com/kubernetes-client/python-base/issues/88 https://github.com/kubernetes-client/python-base/commit/15474efbaf906bf557c4a38392a0b06c95ce7841 打开webterminal页面后,容器所在机器的CPU负载会飙升,改成async异步也解决不了,官方sdk有个方法: /root/.py_env/.virtualenvs/ansible/lib/python3.9/site-packages/kubernetes/stream/ws_client.py: Read more…