아래의 흐름과 같이 AWS cloudwatch를 통해 A계정에서 s3에 저장된 로그를 B 계정의 s3 로 전달하는 후, B 계정의 lambda 에서 elasticsearch 로 데이터를 bulk 로 넣는 방법 입니다.


┌─ AWS Account A───┐     ┌───────────AWS Account B ───────────────┐
│                  │     │                                        │
│                  │     │                                        │
│ ┌─────────────┐  │     │ ┌────────────┐      ┌────────────────┐ │
│ │  S3 log     ├──┼─────┤►│ S3 log     ├────► │ ElasticSearch  │ │
│ └─────────────┘  │ ▲   │ └────────────┘  ▲   └────────────────┘ │
│                  │ │   │                 │                      │
│                  │ │   │                 │                      │
│ ┌─────────────┐  │ │   │           ┌─────┴─────┐                │
│ │  LambdaA    ├──┼─┘   │           │ LambdaB   │                │
│ └─────────────┘  │     │           └───────────┘                │
│                  │     │                                        │
└──────────────────┘     └────────────────────────────────────────┘

import json
import boto3
from urllib.parse import unquote_plus

def lambda_handler(event, context):
    source_client = boto3.client('s3')
    destination_client = boto3.client('s3', aws_access_key_id='XXXXXXXX', aws_secret_access_key='XXXXXXXX')
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        source_response = source_client.get_object(Bucket=bucket, Key=key)
        destination_client.upload_fileobj(source_response['Body'], 'TARGET_BUCKET_NAME',key)

import boto3
import gzip
import io
from elasticsearch import Elasticsearch, helpers
from datetime import datetime
from urllib.parse import unquote_plus

def handler(event, context):
    s3client = boto3.client('s3')
    es = Elasticsearch(hosts='vpc-XXXX-XXXX.ap-northeast-2.es.amazonaws.com', 
                   http_auth=('username','password'), scheme='https', port=443)
    docs = []
    index = 'someting-log-index-' + str(datetime.now().date())
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        response = s3client.get_object(Bucket=bucket, Key=key)
        body = response['Body'].read()
        with gzip.GzipFile(fileobj=io.BytesIO(body), mode='rb') as fh:
            lines = fh.read().splitlines()
            cnt = 0;
            for line in lines:
                item = line.decode('utf-8').split()
                cnt = cnt+1
                if cnt > 1:
                    if item[8] == '-':
                        item[8] = 0
                    if item[9] == '-':
                        item[9] = 0
                    row = {
                        "interface-id" : item[2],
                        "srcaddr"      : item[3],
                        "dstaddr"      : item[4],
                        "srcport"      : item[5],
                        "dstport"      : item[6],
                        "protocol"     : item[7],
                        "packets"      : int(item[8]),
                        "bytes"        : int(item[9]),
                        "start"        : datetime.utcfromtimestamp(int(item[10])).strftime('%Y-%m-%dT%H:%M:%S'),
                        "end"          : datetime.utcfromtimestamp(int(item[11])).strftime('%Y-%m-%dT%H:%M:%S'),
                        "action"       : item[12],
                        "log-status"   : item[13]
                    }
                    docs.append({
                        '_index': index,
                        '_source': row
                    })
    helpers.bulk(es, docs)