51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

查看: 877|回复: 0

Shell搞定Linux命令审计其实很简单!

[复制链接]

该用户从未签到

发表于 2022-10-10 17:18:38 | 显示全部楼层 |阅读模式
首先,当谈到Linux的操作审计需求时,大多数我们希望的是还原线上服务器被人为(误)操作时执行的命令行,以及它关联的上下文。这个需求场景其实跟通用的业务日志采集一致,简单一点可以直接通过

history 将内容发给 syslog,复杂一点的采用 auditd 或 ebpf 在内核层面上捕获行为。

  不过本文不打算对上述的方案做原理解释,仅仅站在一个运维小白的角度来完成日常 80%(80%的数据来源?我也不知道,大概是二八原则)的操作审计 。既然文章标题是用 Shell来完成, 由此可见今



天的主题跟 Bash 脱不了关系了。

  一句话概括今天的主题:利用定制 Bash 源增加日志审计功能,并将用户操作发给 rsyslog 聚合,最后在 elasticsearch 做日志存储和查询。


  Linux 部分

  1. 准备一些必要的工具

  ·rsyslog:  一个Linux上自带并兼容 syslog 语法的日志处理服务


  · jq: 一个在 shell 下处理 json 数据的小工具


  · logger: 一个可以往 syslog 输入日志的工具


  2. 这些小工具除 jq 外,大多操作系统发行版都自带,如果没有的话也可以直接用操作系统内置的包管理工具安装。


  ash.audit.sh,并将其拷贝到 /etc/profile.d/ 目录下:


  if [ "${SHELL##*/}" != "bash" ]; then


    return


  fi


  if [ "${AUDIT_READY}" = "yes" ]; then


      return


  fi


  declare -rx HISTFILE="$HOME/.bash_history


  declare -rx HISTSIZE=500000


  declare -rx HISTFILESIZE=500000


  declare -rx HISTCONTROL=""


  declare -rx HISTIGNORE=""


  declare -rx HISTCMD


  declare -rx AUDIT_READY="yes"


  shopt -s histappend


  shopt -s cmdhist


  shopt -s histverify


  if shopt -q login_shell && [ -t 0 ]; then


    stty -ixon


  fi


  if groups | grep -q root; then


    declare -x TMOUT=86400


    # chattr +a "$HISTFILE"


  fi


  declare -a LOGIN_INFO=( $(who -mu | awk '{print $1,$2,$6}') )


  declare -rx AUDIT_LOGINUSER="${LOGIN_INFO[0]}"


  declare -rx AUDIT_LOGINPID="${LOGIN_INFO[2]}"


  declare -rx AUDIT_USER="$USER"


  declare -rx AUDIT_PID="$$"


  declare -rx AUDIT_TTY="${LOGIN_INFO[1]}"


  declare -rx AUDIT_SSH="$([ -n "$SSH_CONNECTION" ] && echo "$SSH_CONNECTION" | awk '{print $1":"$2"->"$3":"$4}')"


  declare -rx AUDIT_STR="$AUDIT_LOGINUSER  $AUDIT_LOGINPID  $AUDIT_TTY  $AUDIT_SSH"


  declare -rx AUDIT_TAG=$(echo -n $AUDIT_STR | sha1sum |cut -c1-12)


  declare -x AUDIT_LASTHISTLINE=""


  set +o functrace


  shopt -s extglob


  function AUDIT_DEBUG() {


    if [ -z "$AUDIT_LASTHISTLINE" ]; then


      local AUDIT_CMD="$(fc -l -1 -1)"


      AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"


    else


      AUDIT_LASTHISTLINE="$AUDIT_HISTLINE"


    fi


    local AUDIT_CMD="$(history 1)"


    AUDIT_HISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"


    if [ "${AUDIT_HISTLINE:-0}" -ne "${AUDIT_LASTHISTLINE:-0}" ] || [ "${AUDIT_HISTLINE:-0}" -eq "1" ]; then


      MESSAGE=$(jq -c -n \


       --arg pwd "$PWD" \


       --arg cmd "${AUDIT_CMD##*( )?(+([0-9])?(\*)+( ))}" \


       --arg user "$AUDIT_LOGINUSER" \


       --arg become "$AUDIT_USER" \


       --arg pid "$AUDIT_PID" \


       --arg info "${AUDIT_STR}" \


       '{cmd: $cmd, user: $user, become: $become, pid: $pid, pwd: $pwd, info: $info}')


      logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE"


    fi


  }


  function AUDIT_EXIT() {


    local AUDIT_STATUS="$?"


    if [ -n "$AUDIT_TTY" ]; then


      MESSAGE_CLOSED=$(jq -c -n \


          --arg action "session closed" \


          --arg user "$AUDIT_LOGINUSER" \


          --arg become "$AUDIT_USER" \


          --arg pid "$AUDIT_PID" \


          --arg info "${AUDIT_STR}" \


          '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')


      logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_CLOSED"


    fi


    exit "$AUDIT_STATUS"


  }


  declare -frx +t AUDIT_DEBUG



  declare -frx +t AUDIT_EXIT


  if [ -n "$AUDIT_TTY" ]; then

    MESSAGE_OPENED=$(jq -c -n \


        --arg action "session opened" \


        --arg user "$AUDIT_LOGINUSER" \


        --arg become "$AUDIT_USER" \


        --arg pid "$AUDIT_PID" \


        --arg info "${AUDIT_STR}" \


        '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')


    logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_OPENED"


  fi


  declare -rx PROMPT_COMMAND="[ -n "$AUDIT_DONE" ] && echo ''; AUDIT_DONE=; trap 'AUDIT_DEBUG && AUDIT_DONE=1; trap DEBUG' DEBUG


  declare -rx BASH_COMMAND


  declare -rx SHELLOPT


  trap AUDIT_EXIT EXIT


  简单说明下这个脚本,大致就是定义了 shell 的历史条目、登录超时时间、以及审计日志的格式和发送。


  3. 配置rsyslog 客户端,本地创建一个 /etc/rsyslog.d/40-audit.conf 文件,用于将本地local6级别的系统日志发送远端的rsyslog服务集中处理。


  $RepeatedMsgReduction off


  local6.info @<>:514


  & stop


  配置完成后,别忘了重启下 rsyslog 服务!


  数据部分


  数据部分顾名思义,用于接收并处理客户端发来的操作系统日志。这里我们用到了 rsyslog 和 elasticsearch 两个服务了。

  1. 准备rsyslog-elasticsearch


  要让 rsyslog 将日志发送给 elastichsearch,我们就必须安装它的 es 模块


  # Ubuntu  


  apt-get install -y rsyslog-elasticsearch rsyslog-mmjsonparse  


  #CentOS


  yum install rsyslog-elasticsearch rsyslog-mmjsonparse


  2. 准备 ElasticSearch 服务


  为了简单部署,本文直接用 docker 快速拉起一个 ES 服务


  docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.3.1



  配置 rsyslog 服务端,创建一个文件 /etc/rsyslog.d/40-audit-server.conf,用于定义日志的写入策略。


  $RepeatedMsgReduction off


  $ModLoad imudp


  $UDPServerRun 514


  module(load="mmjsonparse")          # for parsing CEE-enhanced syslog messages


  module(load="omelasticsearch")      # for outputting to Elasticsearch


  #try to parse a structured log



  # this is for index names to be like: rsyslog-YYYY.MM.DD

  template(name="rsyslog-index" type="string" string="bashaudit-%$YEAR%.%$MONTH%.%$DAY%")


  # this is for formatting our syslog in JSON with @timestamp


  template(name="json-syslog" type="list") {


      constant(value="{")


        constant(value="\"@timestamp\":\"")     property(name="timegenerated" dateFormat="rfc3339" date.inUTC="on")


        constant(value="\",\"host\":\"")        property(name="fromhost-ip")


        constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")


        constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")


        constant(value="\",\"program\":\"")     property(name="programname")


        constant(value="\",\"tag\":\"")         property(name="syslogtag" format="json")


        constant(value="\",")                   property(name="$!all-json" position.from="2")


      # closing brace is in all-json


  }


  if ($syslogfacility-text == 'local6' and $syslogseverity-text == 'info') then {


   action(type="mmjsonparse")


   action(type="omelasticsearch" template="json-syslog"
searchIndex="rsyslog-index" dynSearchIndex="on" server="<your_elasticsearch_address>" serverport="



<your_elasticsearch_port>")


          # action(type="omfile" file="/var/log/bashaudit.log")

          stop


  }


  这里采用了 rsyslog 的两个 module 来处理收集的日志


  ·mmjsonparse用于 json 格式化日志


  · omelasticsearch用于配置 ElastichSearch


  · 配置完成重启 rsyslog 服务


  查询部分


  审计日志的查询我们可以使用 Kibana 或者自己根据 ElasticSearch API 进行二次开发。这里我们以 Kibana 举例。

  cat << EOF   > ./kibana.yml


  server.port: 15601


  elasticsearch.hosts: ["http://<your_elasticsearch_address>:<your_elasticsearch_port>"]


  i18n.locale: "zh-CN"


  EOF


  docker run -d --ulimit nofile=1000000:1000000 --net host --name elasticsearch-audit -v ./kibana.yml:/usr/share/kibana/config/kibana.yml  --restart always



docker.elastic.co/kibana/kibana-oss:7.3.1



  本地访问http://localhost:15601进入 kibana 配置创建一个名为 bashaudit 的索引模式。




之后,我们就能进入 Discover 中查询审计日志了,包含了基本Shell执行时间、来源用户、执行目录等数据。




再进一步,我们也可以通过调用 API 的方式对审计日志做一些额外的二次开发,例如:


  ·对线上服务器热点用户统计


  · 对线上服务器做热点操作统计


  · 对线上危险Shell 操作做告警








总结

  本文讲述了采用定制 Bash 的方式,在用户登录初始化 Shell 的方式将其后续的命令行操作发送给 rsyslog 服务进行处理,并将格式化后的日志存储在 ElasticSearch 中方便辅助系统管理者在线上故障定


位时使用,也可以依此对 Linux命令行审计做可视化的二次开发。




  不过本文基于定制 Bash 的方式仍然具备很多局限性,例如:

  ·不能审计 ShellScript 内的执行逻辑;


  · 存在用其他 shell 绕过审计,如 zsh 等;


  可以看到要想审计到更详细的内容,光在 Bash(表面功夫)上实现并不能满足,读者可以尝试使用snoopy 对 Shell 脚本内部做跟踪审计。







本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?(注-册)加入51Testing

x
回复

使用道具 举报

本版积分规则

关闭

站长推荐上一条 /1 下一条

小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

GMT+8, 2024-3-29 17:06 , Processed in 0.078979 second(s), 25 queries .

Powered by Discuz! X3.2

© 2001-2024 Comsenz Inc.

快速回复 返回顶部 返回列表