본문 바로가기
Generalist/data engineering

[AWS]AWS 기반 데이터분석 파이프라인 구축 - Analytics on AWS 워크숍[5]

by 홍원 2022. 5. 30.

전제조건


실습과정 미리보기

  • 람다 함수를 작성하여 Athena가 S3의 processsed data에서 Hits 별 Top 5 Popular Songs를 쿼리하고 가져 오는 코드를 호스팅

서비스명 서비스 설명 비고
AWS Athena Amazon Athena는 표준 SQL을 사용해 데이터를 간편하게 분석할 수 있는 대화식 쿼리 서비스  Athena는 서버리스 서비스이므로 관리할 인프라가 없으며 실행한 쿼리에 대해서만 비용을 지불함
AWS Lambda AWS Labmda는 서버를 관리하지 않고도 코드를 실행할 수 있는 AWS에서 제공하는 서버리스 컴퓨팅 서비스

서버리스란?
서버가 없는것이 아니라 서버에 대한 요청을 처리하는 로직을 함수단위로 정의하여 요청이 들어올때마다 함수를 호출하는 방식

생활코딩님의 람다 실습 영상

1. 람다 함수 생성

람다 서비스를 사용하기 위해 람다 함수를 생성

  • 리전이 us-east-1  선택되어 있는지 확인
  • Create function 클릭 

  • Function name : Analyticsworkshop_top5Songs
  • Runtime : Python 3.8
  • Permissions 아래의 Chane default execution role 를 확장하여, Create a new role with basic Lambda permissions 이 선택되어 있는지 확인
  • Create Function 클릭


2. 람다 함수 작성

방금 만든 람다 함수에 대한 코드를 작성합니다. 

import boto3
import time
import os

# Environment Variables
DATABASE = os.environ['DATABASE']
TABLE = os.environ['TABLE']
# Top X Constant
TOPX = 5
# S3 Constant
S3_OUTPUT = f's3://{os.environ["BUCKET_NAME"]}/query_results/'
# Number of Retries
RETRY_COUNT = 10

def lambda_handler(event, context):
    client = boto3.client('athena')
    # query variable with two environment variables and a constant
    query = f"""
        SELECT track_name as \"Track Name\", 
                artist_name as \"Artist Name\",
                count(1) as \"Hits\" 
        FROM {DATABASE}.{TABLE} 
        GROUP BY 1,2 
        ORDER BY 3 DESC
        LIMIT {TOPX};
    """
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={ 'Database': DATABASE },
        ResultConfiguration={'OutputLocation': S3_OUTPUT}
    )
    query_execution_id = response['QueryExecutionId']
    # Get Execution Status
    for i in range(0, RETRY_COUNT):
        # Get Query Execution
        query_status = client.get_query_execution(
            QueryExecutionId=query_execution_id
        )
        exec_status = query_status['QueryExecution']['Status']['State']
        if exec_status == 'SUCCEEDED':
            print(f'Status: {exec_status}')
            break
        elif exec_status == 'FAILED':
            raise Exception(f'STATUS: {exec_status}')
        else:
            print(f'STATUS: {exec_status}')
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')
    # Get Query Results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result['ResultSet']['Rows'])
    # Function can return results to your application or service
    # return result['ResultSet']['Rows']

boto3를 사용하여 Athena 클라이언트에 액세스합니다.

Boto는 Python 용 Amazon Web Services (AWS) SDK입니다. 이를 통해 Python 개발자는 EC2 및 S3와 같은 AWS 서비스를 생성, 구성, 관리 할 수 있습니다.

참고자료

- https://boto3.amazonaws.com/v1/documentation/api/latest/index.html?id=docs_gateway 

- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html 

 

 

코드 작성 모습


3. 환경 변수 설정

Lambda 함수에 대한 환경 변수를 사용하면 코드를 변경하지 않고도 설정을 함수 코드 및 라이브러리에 동적으로 전달할 수 있습니다.

 

Configuration 탭을 클릭하고 좌측의 Environment variables 선택

Edit클 클릭하여 아래의 3가지 환경 변수 추가.

  • Key: DATABASE, Value: analyticsworkshopdb
  • Key: TABLE, Value: processed_data
  • Key: BUCKET_NAME, Value: yourname-analytics-workshop-bucket

 

좌측의 General configuration 선택 후 Edit 클릭

  • Memory (MB) 는 기본값인 128 MB로 놔둡니다.
  • Timeout 을 10초로 변경합니다.
  • Save 클릭

4. 실행 역할 설정

  • Configuration 탭을 클릭하고 좌측의 Permissions 클릭:
    • Execution Role 아래의 Role 을 클릭하여 새로운 IAM 콘솔탭을 엽니다.
  • Attach policies 클릭
  • 다음 두 Policy을 추가합니다 (필터 박스에서 검색하고 선택 후 Attach policy를 누르십시오).

  • AmazonS3FullAccess
  • AmazonAthenaFullAccess
  • 이 Policy들이 Role에 추가 되었다면 탭을 닫습니다.


5. 테스트 이벤트 구성

이제 함수를 테스트 할 준비가 되었습니다. 함수 Code 섹션에서 Deploy를 클릭하여 먼저 함수를 배포합니다.

다음으로 새로 생성된 람다 함수의 실행 결과를 확인하기 위해 Dummy 테스트 이벤트를 구성 해 보겠습니다.

  • Code source 아래에서 Test 를 클릭합니다.
  • 테스트 이벤트를 구성 할 수있는 새 창이 나타납니다.
    • Create new test event 가 기본적으로 선택되어 있습니다.
    • Event template: Hello World
    • Event name: Test
    • 모든 것을 그대로 둡니다.
    • Create 클릭

 

  • Test 다시 클릭
  • Execution Result 섹션에서 json 형식의 출력을 볼 수 있습니다.


6. Athena를 통한 확인

Athena를 통해 결과를 확인합니다.

왼쪽 패널에서, Database 드롭 다운에서 analyticsworkshopdb 선택 한 뒤 아래 쿼리를 작성합니다

SELECT track_name as "Track Name",
    artist_name as "Artist Name",
    count(1) as "Hits" 
FROM analyticsworkshopdb.processed_data 
GROUP BY 1,2 
ORDER BY 3 DESC 
LIMIT 5;

쿼리 결과


현재 과정 되짚어보기

람다 함수를 작성하여 Athena가 S3의 processsed data에서 Hits 별 Top 5 Popular Songs를 쿼리하고 가져 오는 코드를 호스팅

 

 


다음편 보러가기

2022.05.30 - [Specialist/AWS] - [AWS]AWS 기반 데이터분석 파이프라인 구축 - Analytics on AWS 워크숍[6]

 

[AWS]AWS 기반 데이터분석 파이프라인 구축 - Analytics on AWS 워크숍[6]

전제조건 [AWS] AWS 기반 데이터 파이프라인 구축 - Analytics on AWS 워크숍[5] 실습과정 미리보기 Amazon Redshift 클러스터를 설정하고 S3 데이터를 Amazon Redshift로 로드 1. Redshift IAM Role 생성 이 단계에서는

khw742002.tistory.com

 

댓글