Motivation

找个东西收集服务日志,方便结构化搜索,定位异常

ELK Elastic Search Logstash Kibana

Docker deploy

有个打好的镜像,开箱即用: sebp/elk:8.3.3

这个镜像里已经包括了 ELK,只要run起来就能直接用

docker compose example

1
2
3
4
5
6
7
8
9
10
version: "3"
services:
elk:
image: sebp/elk:8.3.3
ports:
- "5601:5601" # Kibana
- "9200:9200" # ES
- "5044:5044" # logstash
volumes:
- "filebeat.conf:/etc/logstash/conf.d/02-beats-input.conf"

Disable SSL/TLS

默认配置了ssl,我只是自己浅用一下不想配证书:Disabling SSL/TLS

改一下配置挂进去,覆盖原来的

1
2
3
4
5
6
# filebeat.conf
input {
beats {
port => 5044
}
}

很简单的配置:input是beats,端口号5044

FileBeats

找个docker imagedocker.elastic.co/beats/filebeat:8.5.2

run起来就能用了。

filebeats config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
output:
logstash:
enabled: true
hosts:
- ${LOGSTASH_HOSTS}
timeout: 15
filebeat:
inputs:
-
type: log
enabled: true
json:
keys_under_root: true
overwrite_keys: true
add_error_key: true
expand_keys: true
paths:
- "/usr/share/filebeat/logs/*.log"

input

path

watch file path

json

除了配置beats读取文件路径,还要配置json解析来结构化数据

output

hosts配置logstash 的 host:port 读环境变量

Frequently encountered issues

limit mmap counts

Elasticsearch is not starting (1): max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

update the vm.max_map_count=262144 setting in /etc/sysctl.conf, then sysctl -p

see: Virtual memory

Python Log Format

Customize a logger formatter, reference python-logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class LogstashFormatter(logging.Formatter):
def __init__(self, message_type="Logstash", tags=None):
self.message_type = message_type
self.tags = tags if tags else []

def get_extra_fields(self, record):
# The list contains all the attributes listed in
# http://docs.python.org/library/logging.html#logrecord-attributes
skip_list = (
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"id",
"levelname",
"module",
"msecs",
"msecs",
"message",
"msg",
"name",
"pathname",
"relativeCreated",
"extra",
"auth_token",
"password",
"stack_info",
)

easy_types = (str, bool, dict, float, int, list, type(None))

fields = {}

for key, value in record.__dict__.items():
if key not in skip_list:
if isinstance(value, easy_types):
fields[key] = value
else:
fields[key] = repr(value)

return fields

def get_debug_fields(self, record):
fields = {
"stack_trace": self.format_exception(record.exc_info),
}
return fields

@classmethod
def format_source(cls, message_type, host, path):
return "%s://%s/%s" % (message_type, host, path)

@classmethod
def format_timestamp(cls, time):
tstamp = datetime.utcfromtimestamp(time)
return (
tstamp.strftime("%Y-%m-%dT%H:%M:%S")
+ ".%03d" % (tstamp.microsecond / 1000)
+ "Z"
)

@classmethod
def format_exception(cls, exc_info):
return "".join(traceback.format_exception(*exc_info)) if exc_info else ""

@classmethod
def serialize(cls, message):
return bytes(json.dumps(message, default=str), "utf-8")

def format(self, record):
# Create message dict
message = {
"@timestamp": self.format_timestamp(record.created),
"@version": "1",
"message": record.getMessage(),
"path": record.pathname,
"tags": self.tags,
"type": self.message_type,
# Extra Fields
"level": record.levelname,
"logger_name": record.name,
}

# Add extra fields
message.update(self.get_extra_fields(record))
if record.exc_info:
message.update(self.get_debug_fields(record))

return json.dumps(message)

Then use logging.FileHandler with this Formatter, output to a log file, let filebeats collect output stream.