对于 Linux 操作审计的需求,我们通常希望能够还原线上服务器被人为操作时执行的命令行及其相关上下文。这个需求与通用的业务日志采集类似,可以通过简单的 history 命令将内容发送给 syslog,或者使用更复杂的 auditd 或 ebpf 在内核层面捕获行为。
本文不会对上述方案进行原理解释,而是从运维小白的角度来完成日常 80% 的操作审计(80% 的数据来源不详,可能是二八原则)。文章标题表明我们将使用 Shell 来完成这一任务,因此今天的主题与 Bash 密不可分。
简而言之,今天的主题是:通过定制 Bash 源代码来增加日志审计功能,并将用户操作发送给 rsyslog 进行聚合,最后在 elasticsearch 中进行日志存储和查询。
Linux 部分
-
准备一些必要的工具
-
rsyslog: 一个Linux上自带并兼容 syslog 语法的日志处理服务 -
jq: 一个在 shell 下处理 json 数据的小工具 -
logger: 一个可以往 syslog 输入日志的工具
“
这些小工具除 jq 外,大多操作系统发行版都自带,如果没有的话也可以直接用操作系统内置的包管理工具安装。
”
-
定制 bash.audit.sh,并将其拷贝到 /etc/profile.d/ 目录下
if [ "${SHELL##*/}" != "bash" ]; then
return
fi
if [ "${AUDIT_READY}" = "yes" ]; then
return
fi
declare -rx HISTFILE="$HOME/.bash_history"
declare -rx HISTSIZE=500000
declare -rx HISTFILESIZE=500000
declare -rx HISTCONTROL=""
declare -rx HISTIGNORE=""
declare -rx HISTCMD
declare -rx AUDIT_READY="yes"
shopt -s histappend
shopt -s cmdhist
shopt -s histverify
if shopt -q login_shell && [ -t 0 ]; then
stty -ixon
fi
if groups | grep -q root; then
declare -x TMOUT=86400
# chattr +a "$HISTFILE"
fi
declare -a LOGIN_INFO=( $(who -mu | awk '{print $1,$2,$6}') )
declare -rx AUDIT_LOGINUSER="${LOGIN_INFO[0]}"
declare -rx AUDIT_LOGINPID="${LOGIN_INFO[2]}
"
declare -rx AUDIT_USER="$USER"
declare -rx AUDIT_PID="$$"
declare -
rx AUDIT_TTY="${LOGIN_INFO[1]}"
declare -rx AUDIT_SSH="$([ -
n "$SSH_CONNECTION" ] && echo "$SSH_CONNECTION" | awk '{print $1":"$2"->"$3":"$4}')
"
declare -rx AUDIT_STR="$AUDIT_LOGINUSER $AUDIT_LOGINPID $AUDIT_TTY $AUDIT_SSH"
declare -
rx AUDIT_TAG=$(echo -n $AUDIT_STR | sha1sum |cut -c1-12)
declare -x AUDIT_LASTHISTLINE=""
set +o functrace
shopt -s extglob
function AUDIT_DEBUG() {
if [ -z "$AUDIT_LASTHISTLINE" ]; then
local AUDIT_CMD="$(fc -l -1 -1)"
AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
else
AUDIT_LASTHISTLINE="$AUDIT_HISTLINE"
fi
local AUDIT_CMD="$(history 1)"
AUDIT_HISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
if [ "${AUDIT_HISTLINE:-0}" -
ne "${AUDIT_LASTHISTLINE:-0}" ] || [ "${AUDIT_HISTLINE:-0}" -eq "1" ]; then
MESSAGE=$(jq -c -n \
--arg pwd "$PWD" \
--arg cmd "${AUDIT_CMD##*( )?(+([0-9])?(\*)+( ))}" \
--arg user "$AUDIT_LOGINUSER" \
--arg become "$AUDIT_USER" \
--arg pid "$AUDIT_PID" \
--arg info "${AUDIT_STR}" \
'{cmd: $cmd, user: $user, become: $become, pid: $pid, pwd: $pwd, info: $info}')
logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE"
fi
}
function AUDIT_EXIT() {
local AUDIT_STATUS="$?"
if [ -n "$AUDIT_TTY" ]; then
MESSAGE_CLOSED=$(jq -c -n \
--arg action "session closed" \
--arg user "$AUDIT_LOGINUSER" \
--arg become "$AUDIT_USER" \
--arg pid "$AUDIT_PID" \
--arg info "${AUDIT_STR}" \
'{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_CLOSED"
fi
exit "$AUDIT_STATUS"
}
declare -frx +t AUDIT_DEBUG
declare -frx +t AUDIT_EXIT
if [ -n "$AUDIT_TTY" ]; then
MESSAGE_OPENED=$(jq -c -n \
--arg action "session opened" \
--arg user "$AUDIT_LOGINUSER" \
--arg become "$AUDIT_USER" \
--arg pid "$AUDIT_PID" \
--arg info "${AUDIT_STR}" \
'{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_OPENED"
fi
declare -rx
PROMPT_COMMAND="[ -
n "$AUDIT_DONE" ] && echo ''; AUDIT_DONE=; trap 'AUDIT_DEBUG && AUDIT_DONE=1;
trap DEBUG' DEBUG"
declare -rx BASH_COMMAND
declare -rx SHELLOPT
trap AUDIT_EXIT EXIT
简单说明下这个脚本,大致就是定义了 shell 的历史条目、登录超时时间、以及审计日志的格式和发送。
-
配置rsyslog 客户端,本地创建一个 /etc/rsyslog.d/40-audit.conf 文件,用于将本地 local6级别
的系统日志发送远端的rsyslog服务集中处理
$RepeatedMsgReduction off
local6.info @:514
& stop
“
配置完成后,别忘了重启下 rsyslog 服务!
”
数据部分
数据部分顾名思义,用于接收并处理客户端发来的操作系统日志。这里我们用到了 rsyslog 和 elasticsearch 两个服务了。
-
准备rsyslog-elasticsearch
要让 rsyslog 将日志发送给 elastichsearch,我们就必须安装它的 es 模块
# Ubuntu
apt-get install -y rsyslog-elasticsearch rsyslog-mmjsonparse
#CentOS
yum install rsyslog-elasticsearch rsyslog-mmjsonparse
-
准备 ElasticSearch 服务
为了简单部署,本文直接用 docker 快速拉起一个 ES 服务
docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-
node" elasticsearch:7.3.1
-
配置 rsyslog 服务端,创建一个文件 /etc/rsyslog.d/40-audit-server.conf,用于定义日志的写入策略。
$RepeatedMsgReduction off
$ModLoad imudp
$UDPServerRun 514
module(load="mmjsonparse") # for parsing CEE-enhanced syslog messages
module(load="omelasticsearch") # for outputting to Elasticsearch
#try to parse
a structured log
# this is for index names to be like: rsyslog-YYYY.MM.DD
template(name="rsyslog-index" type="string" string="bashaudit-%$YEAR%.%$MONTH%.%$DAY%")
# this is for formatting our syslog in JSON with @timestamp
template(name="json-syslog" type="list") {
constant(value="{")
constant(value="\"@timestamp\":\"") property(name="timegenerated"
dateFormat="rfc3339" date.inUTC="on")
constant(value="\",\"host\":\"") property(name="fromhost-ip")
constant(value="\",\"severity\":\"") property(name="syslogseverity-text")
constant(value="\",\"facility\":\"") property(name="syslogfacility-text")
constant(value="\",\"program\":\"") property(name="programname")
constant(value="\",\"tag\":\"") property(name="syslogtag" format="json")
constant(value="\",") property(name="$!all-json" position.from="2")
# closing brace is in all-json
}
if ($syslogfacility-text == 'local6' and $syslogseverity-text == 'info') then {
action(type="mmjsonparse")
action(type="omelasticsearch" template="json-syslog" searchIndex="rsyslog-
index" dynSearchIndex="on" server="" serverport="
" )
# action(type="omfile" file="/var/log/bashaudit.log")
stop
}
这里采用了 rsyslog 的两个 module 来处理收集的日志
-
mmjsonparse
用于 json 格式化日志 -
omelasticsearch
用于配置 ElastichSearch
“
配置完成重启 rsyslog 服务
”
查询部分
审计日志的查询我们可以使用 Kibana 或者自己根据 ElasticSearch API 进行二次开发。这里我们以 Kibana 举例。
cat ./kibana.yml
server.port: 15601
elasticsearch.hosts: ["http://:" ]
i18n.locale: "zh-CN"
EOF
docker run -d --ulimit nofile=1000000:1000000 --net host --name elasticsearch-audit -
v ./kibana.yml:/usr/share/kibana/config/kibana.yml --
restart always docker.elastic.co/kibana/kibana-oss:7.3.1
本地访问http://localhost:15601
进入 kibana 配置创建一个名为 bashaudit 的索引模式
之后,我们就能进入 Discover 中查询审计日志了,包含了基本Shell执行时间、来源用户、执行目录等数据。
再进一步,我们也可以通过调用 API 的方式对审计日志做一些额外的二次开发,例如:
-
对线上服务器热点用户统计 -
对线上服务器做热点操作统计 -
对线上危险Shell 操作做告警
总结
本文讲述了采用定制 Bash 的方式,在用户登录初始化 Shell 的方式将其后续的命令行操作发送给 rsyslog 服务进行处理,并将格式化后的日志存储在 ElasticSearch 中方便辅助系统管理者在线上故障定位时使用,也可以依此对 Linux命令行审计做可视化的二次开发。
不过本文基于定制 Bash 的方式仍然具备很多局限性,例如:
-
不能审计 ShellScript 内的执行逻辑; -
存在用其他 shell 绕过审计,如 zsh 等;
可以看到要想审计到更详细的内容,光在 Bash(表面功夫)上实现并不能满足,读者可以尝试使用snoopy 对 Shell 脚本内部做跟踪审计。
以上就是良许教程网为各位朋友分享的Linu系统相关内容。想要了解更多Linux相关知识记得关注公众号“良许Linux”,或扫描下方二维码进行关注,更多干货等着你 !