はじめに
- ベンダーが公開しているセキュリティアドバイザリ(RSS)を購読して脆弱性情報を検知
- 脆弱性を含むパッケージがOS上にインストールされているかを確認
という流れを手動で実施しました。
今回は一連の流れを自動化できるようにしたいと思い、仕組みを考えてみました。
概要
AWS Systems Manager インベントリを利用することで対象EC2インスタンスからメタデータを収集することが可能ですので、これを利用してメタデータに含まれるインストール済みパッケージを取得することにします。
手動で行っていたのと同様に「取得したインストール済みパッケージ情報」と「AWSが公開しているセキュリティアドバイザリRSS」を突き合わせて、脆弱性を含むインストール済みパッケージの有無を調べます。
前準備(インベントリ収集の設定)
Systems Managerインベントリでのメタデータ収集を可能とするため、事前に調査対象となるEC2インスタンスとインベントリの関連付けを行っておく必要があります。
インベントリ収集の設定を参考に、関連付け(及びその前提となるSSM Agentの更新など)を行います。
前準備(EC2インスタンスへのタグ付与)
調査対象とするEC2インスタンスを制限できるように、対象EC2インスタンスにはInventory
という名前のタグを付与し、値がTrue
のEC2インスタンスのみ調査対象とするようにします。
前準備(必要なIAMポリシーをアタッチ)
タグが付与されたEC2インスタンス一覧を取得するためのAmazonEC2ReadOnlyAccess
ポリシーと、Systems Managerインベントリでインストール済みパッケージを取得するためのAmazonSSMReadOnlyAccess
ポリシーを、突き合わせを行う環境にアタッチしておきます。
検証(対象EC2インスタンス一覧の取得)
CLIで、タグが付与されたEC2インスタンス一覧を取得してみます。
aws ec2 describe-instances --filters "Name=tag:Inventory,Values=True"
を実行すると
{ "Reservations": [ { "Instances": [ { "InstanceId": "i-xxxxxxxxxxxxxxxxx",
対象EC2インスタンスIDを含むEC2インスタンス情報一覧を取得することができました。
検証(インストール済みパッケージ一覧の取得)
CLIで、Systems Managerインベントリで収集されたメタデータを取得してみます。
先ほど取得した対象EC2インスタンスIDをオプションで指定します。
aws ssm list-inventory-entries --instance-id i-xxxxxxxxxxxxxxxxx --type-name "AWS:Application"
を実行すると
{ "InstanceId": "i-xxxxxxxxxxxxxxxxx", "Entries": [ { "Name": "cryptsetup-libs", "Version": "2.6.1" }, ...... ], 'NextToken': 'xxxxxx', ......
インストール済みパッケージ一覧を含むメタデータを取得することができました。
ただ、取得できたパッケージ数が少ないように思い、CLIコマンドリファレンスを確認すると、どうやら一度に取得できる項目数が最大50個までとなっており、全てのインストール済みパッケージを取得するにはCLI実行時のレスポンスに含まれるNextToken
を使って再度CLIを実行する必要があるようです。
aws ssm list-inventory-entries --instance-id i-xxxxxxxxxxxxxxxxx --type-name "AWS:Application" --max-results 50 --next-token "xxxxxx"
トークンを指定して再度CLIを実行すると
{ 'InstanceId': 'i-xxxxxxxxxxxxxxxxx', 'Entries': [ { 'Name': 'util-linux-core', 'Version': '2.37.4' }, ......
初回実行時に取得したメタデータの続きを取得することができました。
処理を自動化する
検証結果をもとにして、以下のようなPythonスクリプトを作成しました。
import datetime import json import os import re import time import boto3 import feedparser RSS_DATETIME_FORMAT = '%a, %d %b %Y %H:%M:%S %Z' AMAZON_LINUX_1_SECURITY_ADVISORIES_RSS = 'https://alas.aws.amazon.com/index.html' AMAZON_LINUX_2_SECURITY_ADVISORIES_RSS = 'https://alas.aws.amazon.com/alas2.html' AMAZON_LINUX_2023_SECURITY_ADVISORIES_RSS = 'https://alas.aws.amazon.com/AL2023/alas.rss' MAX_LOOP_COUNT = 100 def rss_severity(rss_entry): """ RSSエントリーから重要度を取得する """ return re.search('(?<=\().+?(?=\))', rss_entry.title).group() def rss_package_name(rss_entry): """ RSSエントリーからパッケージ名を取得する """ return re.search(': (.*)', rss_entry.title).group(1) def is_rss_updated(rss_entry, days): """ RSSエントリーが指定期間内に更新されたものかを検証する """ today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return (today - (datetime.datetime.strptime(rss_entry.updated, RSS_DATETIME_FORMAT) + datetime.timedelta(hours=9)).replace(hour=0, minute=0, second=0, microsecond=0)).days <= days def is_rss_critical_entry(rss_entry): """ RSSエントリーの重要度が critical 以上であるかを検証する """ target_severity = ['critical'] return rss_severity(rss_entry) in target_severity def is_rss_important_entry(rss_entry): """ RSSエントリーの重要度が important 以上であるかを検証する """ target_severity = ['important', 'critical'] return rss_severity(rss_entry) in target_severity def get_target_ec2_instance_ids(): """ InventoryタグにTrueの値を持つEC2インスタンスIDリストを取得する """ client = boto3.client('ec2') response = client.describe_instances( Filters=[ { 'Name': 'instance-state-name', 'Values': ['running'], }, { 'Name': 'tag:Inventory', 'Values': [ 'True', ] }, ] ) return [reservation['Instances'][0]['InstanceId'] for reservation in response['Reservations']] def get_ec2_installed_packages(ec2_instance_id): """ 指定されたEC2インスタンスのインストール済みパッケージリストを取得する """ client = boto3.client('ssm') has_next = True next_token = '' loop_count = 0 installed_packages = [] while has_next and loop_count < MAX_LOOP_COUNT: time.sleep(0.1) loop_count += 1 response = None if not next_token: response = client.list_inventory_entries( InstanceId=ec2_instance_id, TypeName='AWS:Application', ) else: response = client.list_inventory_entries( InstanceId=ec2_instance_id, TypeName='AWS:Application', NextToken=next_token, ) for inventory_entry in response['Entries']: package = { 'Name' : inventory_entry['Name'], 'InstalledVersion' : inventory_entry['Version'], } installed_packages.append(package) next_token = response.get('NextToken') if not next_token: has_next = False return installed_packages def get_vulnerable_packages(ec2_instance_id, rss_entries): """ 指定されたEC2インスタンスにインストールされている脆弱性パッケージリストを取得する """ installed_packages = get_ec2_installed_packages(ec2_instance_id) vulnerable_packages = [] for rss_entry in rss_entries: for installed_package in installed_packages: if rss_package_name(rss_entry) == installed_package['Name']: package = { 'Name' : installed_package['Name'], 'InstalledVersion' : installed_package['InstalledVersion'], } vulnerable_packages.append(package) return [dict(s) for s in set(frozenset(package.items()) for package in vulnerable_packages)] def main(): rss = feedparser.parse(AMAZON_LINUX_2023_SECURITY_ADVISORIES_RSS) rss_entries = [rss_entry for rss_entry in rss.entries if is_rss_updated(rss_entry, 7) and is_rss_important_entry(rss_entry)] ec2_instance_ids = get_target_ec2_instance_ids() for ec2_instance_id in ec2_instance_ids: print(ec2_instance_id) print(get_vulnerable_packages(ec2_instance_id, rss_entries)) if __name__ == '__main__': main()
AWSから情報を取得するのにAWS SDK for Python (boto3)
を、RSSをパースするのにfeedparser
を利用していますので、実行前にインストールしておきます。
pip3 install boto3 pip3 install feedparser
検証時に利用したPythonおよびpipのバージョンは以下の通りです。
# python3 --version Python 3.9.16 # pip3 --version pip 21.3.1
上記スクリプトを実行すると
i-xxxxxxxxxxxxxxxxx [{'Name': 'squid', 'InstalledVersion': '5.8'}] ......
EC2インスタンスにインストールされている脆弱性を含むパッケージ一覧を取得することができました。
おわりに
今回は取得した情報を画面に表示しましたが、実際の運用ではSlackやメールで担当者に通知するといったやり方が考えられます。
今後も適宜見直しを行い、運用方法を改善していきたいと思います。