Lambda Python S3 Copy Other Account To Elasticsearch
아래의 흐름과 같이 AWS cloudwatch를 통해 A계정에서 s3에 저장된 로그를 B 계정의 s3 로 전달하는 후, B 계정의 lambda 에서 elasticsearch 로 데이터를 bulk 로 넣는 방법 입니다.
┌─ AWS Account A───┐ ┌───────────AWS Account B ───────────────┐
│ │ │ │
│ │ │ │
│ ┌─────────────┐ │ │ ┌────────────┐ ┌────────────────┐ │
│ │ S3 log ├──┼─────┤►│ S3 log ├────► │ ElasticSearch │ │
│ └─────────────┘ │ ▲ │ └────────────┘ ▲ └────────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ │ ┌─────┴─────┐ │
│ │ LambdaA ├──┼─┘ │ │ LambdaB │ │
│ └─────────────┘ │ │ └───────────┘ │
│ │ │ │
└──────────────────┘ └────────────────────────────────────────┘
import json
import boto3
from urllib.parse import unquote_plus
def lambda_handler(event, context):
source_client = boto3.client('s3')
destination_client = boto3.client('s3', aws_access_key_id='XXXXXXXX', aws_secret_access_key='XXXXXXXX')
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = unquote_plus(record['s3']['object']['key'])
source_response = source_client.get_object(Bucket=bucket, Key=key)
destination_client.upload_fileobj(source_response['Body'], 'TARGET_BUCKET_NAME',key)
import boto3
import gzip
import io
from elasticsearch import Elasticsearch, helpers
from datetime import datetime
from urllib.parse import unquote_plus
def handler(event, context):
s3client = boto3.client('s3')
es = Elasticsearch(hosts='vpc-XXXX-XXXX.ap-northeast-2.es.amazonaws.com',
http_auth=('username','password'), scheme='https', port=443)
docs = []
index = 'someting-log-index-' + str(datetime.now().date())
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = unquote_plus(record['s3']['object']['key'])
response = s3client.get_object(Bucket=bucket, Key=key)
body = response['Body'].read()
with gzip.GzipFile(fileobj=io.BytesIO(body), mode='rb') as fh:
lines = fh.read().splitlines()
cnt = 0;
for line in lines:
item = line.decode('utf-8').split()
cnt = cnt+1
if cnt > 1:
if item[8] == '-':
item[8] = 0
if item[9] == '-':
item[9] = 0
row = {
"interface-id" : item[2],
"srcaddr" : item[3],
"dstaddr" : item[4],
"srcport" : item[5],
"dstport" : item[6],
"protocol" : item[7],
"packets" : int(item[8]),
"bytes" : int(item[9]),
"start" : datetime.utcfromtimestamp(int(item[10])).strftime('%Y-%m-%dT%H:%M:%S'),
"end" : datetime.utcfromtimestamp(int(item[11])).strftime('%Y-%m-%dT%H:%M:%S'),
"action" : item[12],
"log-status" : item[13]
}
docs.append({
'_index': index,
'_source': row
})
helpers.bulk(es, docs)
Read other posts