작은하마

[AWS] GLUE의 배치결과를 Slack으로 전송하기 본문

AWS/Glue

[AWS] GLUE의 배치결과를 Slack으로 전송하기

꼬몽울 2021. 7. 9. 15:32

Glue에서 수많은 잡을 만들어서 일배치를 하고있는데 매일 저녁, 새벽 이 배치가 잘 끝났는지 아니면 오류가나서 중지된건지 매일 일일이 들어가서 보기가 너무너무 귀찮아 Slack으로 배치 결과를 보내기로했다.

 

원래는 오류가 나면 CloudWatch에서 SNS로 이메일로 오류를 전송해주거나 따로 JOB아래에 SNS로 결과를 이메일로 전달해주는 코드로 확인하곤 했는데 이번에는 Slack이라는 좋은 협업툴이 있어 이쪽으로 전송을 하게되었다.

파란색 박스의 Job들이 각각 Log를 떨구어 빨간색 박스에 있는 Job을 트리거를 할수 있게 만들었다

빨간 박스에 있는 Job에서 각각의 log파일을 파싱하여 중단을 할지 뒷단의 MT Job을 진행시킬지 결정한다.

result=s3.list_objects_v2(Bucket=bucket, Prefix=key_path)
for item in result['Contents']:
    files = item['Key']
    file_nm=files.split('/')[2]
    # print(file_nm[:20])
    if file_nm[:20] == f'log_{date}':
        filenames.append(file_nm)
    if file_nm[:20] in loglist and file_nm[20:22] in ('18','19','20','21','22','23'):
        filenames.append(file_nm)

boto3의 list_objects_v2를 이용하여 s3안의 파일리스트를 가져온다.

결과를 파싱파여 배치날짜에 맞는 로그파일을 가져오게된다. 두번째 if문이 저런이유는 각각의 프로젝트마다 특이사항이 있는데 여기는 안타깝게도 로그파일을 s3://bucket/yyyy/mm/dd/ 이런식으로 저장한게 아니라 하나의 폴더로 저장을 하기 때문에 배치가 끝나는 대략적인 시간에 떨어진 파일을 가져오기 위하여 하드코딩을 진행하였다.(잘못됨)

 

for file in filenames:
    obj = s3.get_object(Bucket=bucket, Key=key_path+file)
    df = pd.read_csv(obj['Body'],names=['jobnm','tnm','gb','etl_exec_dt','etl_dt','etl_yn','result_cnt','start_dtm','end_dtm'])
    suc=len(df.loc[df['succ_yn']=='Y'].index)
    fail=len(df.loc[df['succ_yn']=='N'].index)
    name=file.split('_')[0]
    conts.append(f"{name} 성공 : {suc}건 실패 : {fail}건")
    if fail>0:
        conts.clear()
        conts.append(f"{name}의 실패 : {fail}건 발생으로 JOB 중지")

해당 로그파일은 CSV로 떨어졌기 떄문에 CSV파일을 읽어와 dataframe으로 만든다음 간단한 처리를 진행하였다.

if fail>0 은 해당 로그 파일에서 succ_yn이 N일시 무조건 중지를 해야하기 때문에

Conts에 저장된 값들을 비우고 한줄의 값을 추가하는 기능을 만들었다.

 

text="\n".join(conts)
token = ""
attachments = [
         {
             "color": "#07abf7",
             "pretext": "" ,
             "title": "Job Result" ,
             "text": f"{text}",
             "footer": "",
             "mrkdwn_in": ["text", "pretext"]
         }
]
post_message(token,"#dptf_analysis", attachments)
if len(conts)==1:
    sys.exit('STOP')

여기서 conts에 쌓인 원소들을 한줄씩 전달해주기 위해 join을 사용

 

아래에서 전달을 해주는 것으로 끝나고

 

맨 마지막에는 Job중단을 위해서

Sys.exit()을 추가하여 Job이 중단 될 수 있도록 하였다.

Comments