Introduction

Summary

AWS에서는 유용하게 사용 할 수 있는 log들이 있다. 그 중에서도 cloudtrail log를 수집/분석하여 정보보안 및 운영의 효율성을 높이는데에 도움이 되는 기술을 공유하고자 한다.

Backbround

AWS의 모든 서비스와 기능들은 서로 API로 연동되어 동작하고, API로 사용자와 상호 작용이 이뤄진다. 그렇기때문에 API들의 흐름을 제대로 파악해야 보안 감사가 가능하고, 이슈가 생겼을때 대응이 가능하다. cloudtrail log는 AWS의 모든 API 이밴트들을 기록하기때문에 이것을 활용하여 사용자의 비인가된 행위나 침해시도로 인한 이상징후를 탐지하기 위해서, 각 AWS 계정들의 cloudtrail log를 cloudwatch logs에 저장하고, 동시에 저장되는 log를 ElasticSearch로 보내어 분석하는 모델을 디자인하였다.

1. Architecture

1.1 구성도

  1. 각 AWS 계정의 CloudTrail log를 CloudWatch Logs로 전송
  2. CloudWatch Logs에 log stream이 전송되면, 트리거로 Lambda 함수를 호출 하면서, parameter값으로 cloudtrail log를 전달.
  3. 호출된 Lambda 함수는 cloudtrail log를 argument로 해서, ElasticSearch의 JSON 포맷에 맞춰 parsing.
  4. 여러개의 계정에서 수집되고 parsing된 cloudtrail log는 하나의 ElasticSearch로 HTTPS를 사용하여 전송.
  5. 수집된 cloudtrail log를 분석하여 보안 감사를 실행.

1.2 순서도

  1. 새로운 log가 cloudwatch logs로 인입되면, 동시에 트리거로 Lambda 함수를 호출 하면서, parameter값으로 cloudtrail log를 전달하게 된다.
  2. Lambda 함수에서는 가장 먼저 cloudtrail log를 json format으로 변환하고, 침해시도나 이상징후를 탐지하는데 불필요한 field들을 제거 한다(log timestamp, log source등).
  3. 기본 cloudtrail log만으로는 침해시도나 이상징후 분석에 활용 할 수 있는 정보가 약간 부족하기 때문에, IP Geolocation을 제공하는 SDK나 shodan API등을 사용한 결과를 customizing field로 추가한다.
  4. 완성된 log data를 다른 AWS Account에 있는 ElasticSearch로 보내기위해서 AWS Security token을 생성 후 request header에 추가한다.
  5. PUT method를 사용해서 왼성된 log data를 AWS Security token이 추가된 request header를 첨부하여 ElasticSearch로 보낸다.
  6. ElasticSearch에서는 log data를 받아서, 사전에 정의된 template에 따라서 mapping 후 index에 데이터를 추가한다.
  7. 수집된 데이터를 사용해서 침해시도나 이상징후를 분석한다.

3. 설정하기

3.1 CloudTrail

Event log를 CloudWatch Logs로 보내도록 설정.

  1. cloudtrail → cloudwatch logs에서 New or existing log group 지정.
  2. continue 버튼을 눌려서 설정완료.
  3. cloudwatch logs에서 지정한 log group으로 cloudtrail event log가 들어오는지 확인.
    cloudtrail 설정 참고문서

3.2 CloudWatch logs

cloudwatch log group의 log stream을 elasticsearch로 보내도록 설정.

  1. cloudwatch logs 그룹선택 → Stream to Amazon elasticsearch service 선택.
  2. 수집 하려는 cloudtrail이 elasticsearch와 같은 AWS 계정에 존재한다면, Select account에서 This Account를 선택한다.
  3. 수집 하려는 cloudtrail이 elasticsearch와 서로 다른 AWS 계정에 존재한다면, Another Account를 선택하고, AWS Elasticsearch ARN에서 elasticsearch의 ARN을 입력한다.
  4. AWS Elasticsearch Endpoint에는 elasticsearch의 Endpoint uri을 입력한다.
  5. Log Format은 cloudtrail을 선택한다. cloudwatch logs 설정 참고문서

3.3 Lambda

3.2 단계를 완료하면, cloudwatch logs가 바로 elasticsearch로 전송되는것 같지만, 사실 지정한 elasticsearch로 log를 보내기 위해서 Function name “LogsToElasticsearch_“으로 시작하는 lambda가 자동으로 생성되고 이것을 통해서 elasticsearch로 전송된다. 이 lambda의 code수정으로 기호에 맞춰 customizing field를 추가하거나 불필요한 field를 제거할 수 있다.

  1. Function name이 “LogsToElasticsearch_” 으로 시작하는 lambda를 열어서 코드 확인.
  2. transform 함수에 있는 logEvent.message 변수에 json format으로 parsing된 cloudtrail log가 있는데, 나는 sourceIPAddress field값의 geolocation을 알기위해 MaxMIND에서 제공하는 GeoIP2 API를 사용해서 조회한 결과값을 geo 라는 이름의 field에 추가했다.
//GeoIP SDK 추가
//다운로드 URI : https://dev.maxmind.com/geoip/geoip2/downloadable/#MaxMind_APIs
var jGeoIP = require('jgeoip');
var geoip = new jGeoIP('geodb/GeoLite2-City.mmdb');

//~
//중간코드 생략
//~

//여기는 transform 함수 시작부분.
function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        var indexName = [
            'cloudtrail-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        //logEvent.message 변수에 cloudtrail logs가 있음
        var source = buildSource(logEvent.message, logEvent.extractedFields);

        //AWS에서 추가하는 customizing field
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        //sourceIPAddress field값의 geolocation 조회/추가
        source.geoInfo = geoip.getRecord(logEvent.message.sourceIPAddress);

4. 분석하기

4.1 회사의 클라우드는 언제 가장 바쁘게 움직일까?

수집된 log를 분석할때, 가장 먼저 체크할 포인트는 “언제 가장 바쁘게 움직일까?” 이다. 바쁘게 움직이는 만큼, 바쁜 틈을타서 공격자들이 슬그머니 몰래 숨어들어올 가능성이 크다. 아래 이미지는 이벤트 개수를 측정한것으로 점심시간(12:00) 부터 클라우드가 서서히 분주하게 움직이고, 저녁시간 18:00 ~ 21:00까지 가장 바쁘게 움직이는것을 확인 할 수 있는데, 클라우드에서 보안의 장점중 하나는 바쁜 시간대에는 보안 시스템 리소스를 수직/수평적으로 확장시켜서 더욱 정밀하게 모니터링을 하고, 한가한 시간에는 리소스를 줄여서 비용을 절감 할 수 있다. 클라우드에서 보안 시스템을 어떻게 탄력적으로 운영할지 의사결정을 하는데에 가장 바쁘게 움직이는 시간을 파악하는것은 가장 중요한 요소이다.

4.1 어느 지역에서 접속할까?

클라우드를 사용하는 개발자 또는 연동된 시스템이 Seoul region에만 존재하는데, 이외의 global region에서 접근한 기록이 있다면 문제가 있을수 있고, 항상 주의해서 확인해야 한다. IP geolocation 정보를 cloudtrail log에 추가한것이 이때 유용하게 사용된다.

4.2 머신러닝으로 사용자 행위를 분석해보자

우아한형제들의 경우 보통 1개의 aws account에서 발생되는 aws event가 24시간 기준으로 약 800만개가 된다. 그럼, 이 800만개를 아무리 잘 필터 한다고 해도 날짜/시간/상황등에 따라 800만개의 event 내용이 조금씩 다르기때문에(어떤날은 ec2 관련 이벤트가 많고, 어떤날은 redis관련 이벤트가 많거나, 평소보다 event는 적은데 여러 서비스의가 동시에 움직인다거나 하기 때문에 분석을 위한 통계치를 잡기가 어렵다.)침해시도나 이상징후를 탐지하기는 어려움이 있다. 하지만 아래의 이미지와 같이 machine learning 기능을 사용하면, 스스로 추이를 학습하고 통계 기준을 변경하면서 cloudtrail의 여러 field값들의 상관 관계를 분석 가능하도록 설정 할 수가 있다. 나는 주로 population 기능을 사용해서 분석을 하고 있고, 현재 데이터를 3개월 정도 누적한 결과 99% 이상의 정확도로 이상징후를 탐지하고 있다.

Conclusion

cloudtrail log를 수집/분석하면 회사 AWS리소스가 얼마나 어떻게 사용되고 있는지를 파악 할 수 있고, 어느정도로 복잡한 구성인지 복잡도까지 계산이 가능하다. 이것은 참 중요한데, AWS 보안을 하기 위해서 서비스와 사용자들이 톱니바퀴와 같이 어떻게 서로 맞물려서 상호작용하고 움직이는지 알게되는 것이니 언제 어디서 어떻게 보안 이슈가 생길 수 있는지를 찾을 수 있고, 어느정도 규모의 보안 시스템이 필요할지 계산이 가능하다. 따라서 AWS 보안은 첫번째로 cloudtrail log 분석으로 시작해서 보안 전략을 만들어가는것을 추천하고 싶다.