from channels.generic.websocket import WebsocketConsumer
from django_redis import get_redis_connection
from apps.host.models import Host
from threading import Thread
import json
from libs.k8s import K8SClient, K8SStreamThread, K8SLogStreamThread
from django.conf import settings
from urllib.parse import parse_qs
class ExecConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = self.scope['url_route']['kwargs']['token']
self.rds = get_redis_connection()
def connect(self):
self.accept()
def disconnect(self, code):
self.rds.close()
def get_response(self):
response = self.rds.brpop(self.token, timeout=5)
return response[1] if response else None
def receive(self, **kwargs):
response = self.get_response()
while response:
data = response.decode()
self.send(text_data=data)
response = self.get_response()
self.send(text_data='pong')
class SSHConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = self.scope['user']
self.id = self.scope['url_route']['kwargs']['id']
self.chan = None
self.ssh = None
def loop_read(self):
while True:
data = self.chan.recv(32 * 1024)
# print('read: {!r}'.format(data))
if not data:
self.close(3333)
break
self.send(bytes_data=data)
def receive(self, text_data=None, bytes_data=None):
data = text_data or bytes_data
if data:
data = json.loads(data)
# print('write: {!r}'.format(data))
resize = data.get('resize')
if resize and len(resize) == 2:
self.chan.resize_pty(*resize)
else:
self.chan.send(data['data'])
def disconnect(self, code):
self.chan.close()
self.ssh.close()
# print('Connection close')
def connect(self):
if self.user.has_host_perm(self.id):
self.accept()
self._init()
else:
self.close()
def _init(self):
self.send(bytes_data=b'Connecting ...\r\n')
host = Host.objects.filter(pk=self.id).first()
if not host:
self.send(text_data='Unknown host\r\n')
self.close()
try:
self.ssh = host.get_ssh().get_client()
except Exception as e:
self.send(bytes_data=f'Exception: {e}\r\n'.encode())
self.close()
return
self.chan = self.ssh.invoke_shell(term='xterm')
self.chan.transport.set_keepalive(30)
Thread(target=self.loop_read).start()
class TerminalConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# self.user = self.scope['user']
self.namespace = self.scope['url_route']['kwargs']['namespace']
self.pod = self.scope['url_route']['kwargs']['pod']
self.container = self.scope['url_route']['kwargs']['container']
query_string = self.scope['query_string'].decode()
self.rows = parse_qs(query_string).get('rows', [''])[0]
self.cols = parse_qs(query_string).get('cols', [''])[0]
self.kub = K8SClient(
api_host = settings.K8S_API_HOST,
ssl_ca_cert = settings.K8S_CA_CERT,
key_file = settings.K8S_KEY_FILE,
cert_file = settings.K8S_CERT_FILE
)
self.stream = None
def connect(self):
self.accept()
self._init()
def disconnect(self, close_code):
self.stream.write_stdin('exit\r')
self.stream.close()
def receive(self, text_data=None, bytes_data=None):
data = text_data or bytes_data
if data:
data = json.loads(data)
# print('write: {!r}'.format(data))
resize = data.get('resize')
if resize and len(resize) == 2 and isinstance(resize, list):
rows = resize[0]
cols = resize[1]
self.stream.write_channel(4, json.dumps({"Height": int(rows), "Width": int(cols)}))
else:
self.stream.write_stdin(data['data'])
def _init(self):
self.send(bytes_data=b'Connecting ...\r\n')
try:
self.stream = self.kub.terminal_start(self.namespace, self.pod, self.container, self.rows, self.cols)
except Exception as e:
self.send(bytes_data=f'Exception: {e}\r\n'.encode())
self.close()
return
K8SStreamThread(self, self.stream).start()
class LogConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# self.user = self.scope['user']
self.namespace = self.scope['url_route']['kwargs']['namespace']
self.pod = self.scope['url_route']['kwargs']['pod']
self.container = self.scope['url_route']['kwargs']['container']
self.kub = K8SClient(
api_host = settings.K8S_API_HOST,
ssl_ca_cert = settings.K8S_CA_CERT,
key_file = settings.K8S_KEY_FILE,
cert_file = settings.K8S_CERT_FILE
)
self.stream = None
# self.tail_lines = 300
def connect(self):
self.accept()
self._init()
def disconnect(self, close_code):
try:
# self.stream.write_stdin('exit\r')
self.stream.close()
except Exception as e:
pass
def send_message(self, data):
data = text_data or bytes_data
if data:
self.stream.send(data)
def _init(self):
self.send(bytes_data=b'Connecting ...\r\n')
try:
self.stream = self.kub.get_pod_log(self.namespace, self.pod, self.container)
except Exception as e:
self.send(bytes_data=f'Exception: {e}\r\n'.encode())
self.close()
return
K8SLogStreamThread(self, self.stream).start()