django 通过websocket查看kubernetes容器日志

发布 : 2023-03-03 分类 : 开发 浏览 :

背景

最近在写kubernetes管理平台,需要通过长连接的方式获取容器日志,并在前端展示。

后端代码

需要安装的库

1
pip install django channels channels_redis kubernetes

创建django项目就略过了,日志相关的app名为log
还需要准备一个redis 安装过程跳过

首先修改settings.py

注意内容中有注释的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels', # 新增
'log' # 新增
]

# 设置ASGI应用
ASGI_APPLICATION = 'k8sDashboard.asgi.application'

# channels layers 配置,需要用到redis
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
# 或"hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1')],
},
},
}
# 设置通道层的通信后台 - 本地测试用,线上不建议使用这种方式,使用上方的redis作为后端
# CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "channels.layers.InMemoryChannelLayer"
# }
# }

修改asgi.py

在这里,channels的ProtocolTypeRouter会根据请求协议的类型来转发请求。AuthMiddlewareStack将使用对当前经过身份验证的用户的引用来填充连接的scope, 类似于 Django 的request对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
"""
ASGI config for k8sDashboard project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""

import os
import log.routing

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'k8sDashboard.settings')

application = ProtocolTypeRouter({
"http": get_asgi_application(),
# Just HTTP for now. (We can add other protocols later.)
"websocket": AuthMiddlewareStack(
URLRouter(
log.routing.websocket_urlpatterns
)
),

})

# application = get_asgi_application()

什么是ASGI?

Django默认是使用WSGI,WSGI全称是Web Server Gateway Interface,Web服务网关接口,用来描述Web 服务器如何与Web 应用通信的规范,是不支持Websocket的。而ASGI是异步服务网关接口, 一个介于网络协议服务和Python应用之间的标准接口,能够处理多种通用的协议类型,包括HTTP,HTTP2和WebSocket。

新增 routing.py

此文件我放在了log app目录下,此文件是websocket 的路由文件,等同于http路由的 url.py

1
2
3
4
5
6
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
re_path(r'^ws/log/(?P<namespace>[a-z0-9_-]+)/(?P<pod_name>[a-z0-9_-]+)/(?P<container_name>[a-z0-9_-]+)$', consumers.LogConsumer.as_asgi()),
]

注意:定义websocket路由时,推荐使用常见的路径前缀 (如/ws) 来区分 WebSocket 连接与普通 HTTP 连接, 因为它将使生产环境中部署 Channels 更容易,比如nginx把所有/ws的请求转给channels处理。

新增 consumers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import json

import urllib3
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

from dashboard.k8s import ContainerLog, K8SLogStreamThread

urllib3.disable_warnings()


class LogConsumer(WebsocketConsumer):
# websocket建立连接时执行方法
def connect(self):
# 从url里获取聊天室名字,为每个房间建立一个频道组
# self.room_name = self.scope['url_route']['kwargs']['room_name']
# print(self.scope)
self.namespace = self.scope['url_route']['kwargs']['namespace']
self.pod_name = self.scope['url_route']['kwargs']['pod_name']
self.container_name = self.scope['url_route']['kwargs']['container_name']
self.k8s_channel_name = 'chat_%s_%s_%s' % (self.namespace, self.pod_name, self.container_name)
self.log_api = ContainerLog(self.scope)
self.stream = None

# 将当前频道加入频道组
async_to_sync(self.channel_layer.group_add)(
self.k8s_channel_name,
self.channel_name
)

# 接受所有websocket请求
self.accept()
self._int()

# websocket断开时执行方法
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.k8s_channel_name,
self.channel_name
)
# 因为后端只发送给前端消息,不需要接受前端的消息,所以此函数可以不写,需要双向通信,需要使用此函数处理接收到的消息
# # 从websocket接收到消息时执行函数
# def receive(self, text_data):
# # text_data_json = json.loads(text_data)
# # message = text_data_json['message']
#
# # 发送消息到频道组,频道组调用chat_message方法
# async_to_sync(self.channel_layer.group_send)(
# self.k8s_channel_name,
# {
# 'type': 'chat_message',
# 'message': text_data
# }
# )

# 从频道组接收到消息后执行方法
def chat_message(self, event):
print('chat_message: %s' % event)
message = event['message']
# datetime_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 通过websocket发送消息到客户端
self.send(text_data=json.dumps({
'message': f'{message}'
}))

def _int(self):
try:
self.stream = self.log_api.get_container_log(namespace=self.namespace, pod_name=self.pod_name,
container=self.container_name)
except Exception as e:
self.close()
self.log_api.error_check(e)
return
K8SLogStreamThread(self, self.stream).start()

每个自定义的Consumer类一般继承同步的WebsocketConsumer类或异步的AysncWebSocketConsumer类,它自带 self.channel_name 和self.channel_layer 属性。前者是独一无二的长连接频道名,后者提供了 send(), group_send()和group_add() 3种方法, 可以给单个频道或一个频道组发信息,还可以将一个频道加入到组。

每个频道(channel)都有一个名字。拥有频道名称的任何人都可以向频道发送消息。
一个组(group)有一个名字。具有组名称的任何人都可以按名称向组添加/删除频道,并向组中的所有频道发送消息。

注意:虽然异步Consumer类性能更优,channels推荐使用同步consumer类 , 尤其是调用Django ORM或其他同步程序时,以保持整个consumer在单个线程中并避免ORM查询阻塞整个event。调用channel_layer提供的方法时需要用async_to_sync转换一下。

除此以外,我们还使用了 self.scope[‘url_route’][‘kwargs’][‘xxxx’]从路由中获取了一些参数,在channels程序中,scope是个很重要的对象,类似于django的request对象,它代表了当前websocket连接的所有信息。你可以通过scope[‘user’]获取当前用户对象,还可以通过scope[‘path’]获取当前当前请求路径。

urls.py

我们需要修改两个 url.py
一个是默认app里的,

1
2
3
4
5
6
7
8
9
10
# cat k8sDashboard/urls.py
from django.contrib import admin
from django.urls import path, include, re_path

from dashboard import views

urlpatterns = [
...
path('log/', include('log.urls')),
]

这个是log应用里的 url.py

1
2
3
4
5
6
7
8
# cat log/urls.py
from django.urls import re_path, path
from . import views

app_name = 'log'
urlpatterns = [
re_path('^pod/$', views.container_log, name='container_log'),
]

view.py

log的视图文件内指定了日志的模版文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# cat log/views.py
from django.shortcuts import render
from django.views.decorators.clickjacking import xframe_options_sameorigin

from dashboard import k8s


# Create your views here.

@xframe_options_sameorigin
@k8s.self_login_required
def container_log(request):
name = request.GET.get('name')
namespace = request.GET.get('namespace')
return render(request, "container-log.html", {'name': name, 'namespace': namespace})

container-log.html 前端文件

我的前端使用的layui,主要看scripts部分,实现了前端通过长连接接受到数据并实时显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
<html lang="en">
<head>
<meta charset="UTF-8">
<title>container日志查看 </title>
<link rel="stylesheet" href="/static/layui/css/layui.css">
</head>
<body>
<form class="layui-form" style="margin-left: 1px">
<div class="layui-inline">
<label class="layui-form-label">container</label>
<div class="layui-input-block layui-form">
<select name='pod_log' id='pod_log' lay-filter="pod_log" lay-skin="notepad"></select>
</div>
</div>
</form>

<div class="layui-card layui-panel-body">
<div class="layui-card-body">
<pre class="layui-code" id="container_log">
</pre>
</div>
</div>
<script src="/static/ace/ace.js" type="text/javascript" charset="utf-8"></script>
<script src="/static/ace/theme/theme-chrome.js" type="text/javascript" charset="utf-8"></script>
<script src="/static/ace/mode/mode-yaml.js" type="text/javascript" charset="utf-8"></script>
<script src="/static/layui/layui.js"></script>
{% csrf_token %}
<script>
layui.use(['form', 'jquery'], function () {
const form = layui.form
, $ = layui.jquery;
// select 动态获取内容 设置这里的变量要添加引号
const namespace = "{{ namespace }}"
const name = "{{ name }}"
// websocket 处理函数独立出来,方便多次调用
function logWebsocket(data){
const container_name = data;
const ws = new WebSocket("ws://" + window.location.host + "/ws/log/" + namespace + '/' + name + '/' + container_name);
ws.onopen = function () {
// 新链接,清空div里的内容
document.getElementById("container_log").innerText = ""
};
ws.onmessage = function (evt) {
// 将日志信息显示在页面上
const log_div = document.getElementById("container_log");
// Blob 内容需要使用FileReader 读取
// 参考链接: https://www.geeklive.cn/2023/03/03/javascripts-blob/undefined/javascripts-blob/
var reader = new FileReader()
reader.addEventListener('loadend', function (e) {
// log_div 增加内容
log_div.innerHTML += e.target.result
})
reader.readAsText(evt.data)

};
ws.onclose = function () {
console.log("websocket closed");
};
return false;
}

$.ajax({
url: '{% url "container_api" %}?namespace=' + namespace + '&name=' + name,
type: "GET",
async: false,
success: function (res) {
let container_select = $("#pod_log")
if (res.code == 0) {
//遍历数据,添加到select选项中
$.each(res.data, function (index) {
container_select.append('<option value=' + res.data[index] + '>' + res.data[index] + '</option>')
})
form.render()
logWebsocket(res.data[0])
}
},
error: function (res) {
layer.msg("服务器接口异常")
}
})

form.on('select(pod_log)', function (data) {
logWebsocket(data.value)
})
})
</script>
</body>
</html>

测试结果

如果没有问题的话目录结构应该是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# tree -L 2 
├── k8sDashboard
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── log
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── consumers.py
│   ├── migrations
│   ├── models.py
│   ├── routing.py
│   ├── tests.py
│   ├── urls.py
│   └── views.py
├── manage.py
├── templates
│   ├── ace-edit.html
│   ├── base.html
│   ├── chat
│   ├── config
│   ├── container-log.html
│   ├── index.html
│   ├── k8s
│   ├── loadbalancer
│   ├── login.html
│   └── workload

启动后访问页面

日志

参考文档

本文作者 : WGY
原文链接 : http://geeklive.cn/2023/03/03/django-k8s-log/undefined/django-k8s-log/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
留下足迹