이전 글
https://jjongsk.tistory.com/entry/Instagram-Crawler-Deployment-Using-AWS-Design
개발 요구사항
볼드 처리된 요구사항들은 개발이 필요한 요구사항들이며 나머지는 이미 개발이 되었거나 크롤링과 별개로 개발해야하는 요구사항들이다.
- 특정 맛집 인플루언서 아이디 배열이 주어지며 모든 각 아이디의 피드를 검색한다.
- rapidAPI는 1초에 8번의 rate limit이 존재한다.
- 크롤링은 pagination 방식으로 동작하며 api 요청에 5초의 시간이 소요되며 api 결과값으로 반환되는 pagination token이 있어야 다음 페이지를 불러올 수 있다.
- 피드는 각 계정당 1000개 이상일 수도 있으며 각 피드당 10개 이상의 동영상, 사진이 있을 수 있다.
- 크롤러는 rapid API를 사용하며 값을 정규화한 뒤 GPT API를 사용하여 필요한 값을 추출해낸다.
- 최대한 비용 효율적인 방법을 선호하며 도중 중단되어도 자동으로 다시 시도만 된다면 상관 없다.
- 최소 며칠에 한 번은 동작하도록 설정 가능해야한다.
- 확장성이 좋아야한다.
- 비용 절감을 위해 GPT API batch response 기능을 사용하며 이는 첫 요청 시 특정 url을 주며 24시간 내 임의의 시간에 gpt가 결과 값을 해당 url에 파일로 게시해주는 방식이다. 이 때 여러 요청을 batch 처리 된 파일이 결과값으로 출력된다.
- 정규화된 해당 결과값을 기반으로 네이버 지도 앱에 검색하여 맛집 정보를 얻는다. 이 때 여러개의 맛집이 나올 수 있으나 이 중 가장 일치하는 하나를 찾아내야한다.
- 찾아낸 뒤 mongoDB에 해당 맛집 정보를 올린다.
- 여기서는 배포 과정만 신경쓰면 되며 API 작성, 정규화, 가장 일치하는 맛집 찾기 등의 작업은 이미 프로그래밍 되어있다.
개발
확장성, GPT API, Batch, rate limit등의 요구사항을 만족하기위해 AWS를 사용할 예정이며 특히 확장성과 rapid api의 rate limit을 관리하는 것이 중요하기에 lambda, sqs에 의존하여 개발하기로 결정하였다.
큐
먼저 rapid api에는 1초에 8번의 rate limit이 존재하기에 sqs와 lambda의 동시 실행 설정에 의존하여 개발하기로 결정하였으며 한 개의 메시지에 대해 한 번의 처리만 해야하기에 fifo queue로 만들어 한 번의 실행을 보장해도록 개발하였다.
먼저 메시지 저장을 위한 크롤링 queue를 추가해준다.
이후 큐에 요청하는 코드를 작성해준다. 큐에 들어갈 메시지의 내용으로는 유저 ID, pagination을 위한 토큰, 최신 포스트만 가져오기 위한 마지막 업데이트 시간을 같이 보내준다. 이 때 마지막 업데이트 시간이 없다면 유저의 모든 인스타그램 post를 순회하며 데이터를 가져와야한다.
단 pagination token 값은 처음 queue에는 null값으로 들어가지만 크롤링 과정이 serialize하게 동작하기에 람다 내부에서 post response를 파싱하는 과정에서 다음 페이지 요청이 필요할 때 sqs에 pagination token과 함께 메시지를 전달해주며 만약 마지막 페이지인 경우 sqs에 메시지를 보내지 않는 방식으로 동작하도록 개발해준다.
아래는 테스트를 위해 sqs에 메시지를 전송하는 간단한 임시 코드이다.
const userIds = INSTAGRAM_USER_IDS;
async function main() {
await Promise.all(
userIds.map(async (userId) => {
let lastTakenAt= null;
try {
if (!isAll) {
const result = await collections.instagramPosts.findOne({id})
}
await sendMessageToFIFO<ISendInstagramPostMessage>({
userId,
lastTakenAt,
pageToken: null
});
logger.info("Message sent", { userId });
} catch (e) {
logger.error("Failed to send message", e);
}
})
);
await dbClient.close();
}
main();
이제 코드를 실행하여 잘 동작하는지 확인해보자
메시지 큐에 메시지가 잘 들어가 있는 것을 확인할 수 있다. 이제 다음으로 큐의 메시지를 기반으로 람다 함수를 실행해주면 된다. 이 때 1초 8번의 rate limit은 lambda의 동시실행 함수 개수에 제한을 둬 지킬 수 있도록 개발해준다. 단 8개 그대로 설정하면 너무 빡빡해 문제가 발생할 여지가 있으니 1초 6번 실행되도록 함수를 구현한다.
큐에 메시지가 들어오면 람다가 실행되도록 트리거를 추가해준다. sqs의 콘솔에서 쉽게 추가할 수 있다.
이 때 그냥 추가하려하면 권한 이슈가 발생하니 람다 함수에 실행 역할을 추가해주자.
람다의 permission 탭에 sqs 실행 권한을 추가해준다.
이후 다시 확인해보면 정상적으로 람다 트리거가 등록되는 것을 확인할 수 있다.
이 때 주의해야 할 점은 람다는 기본적으로 트리거에 의해 실행될 때 batch처리를 하도록 설정되어 있기 때문에 이 값을 1로 제한하여 batch처리 되지 않도록 변경해야 한다. 다만 이는 프로덕트의 환경에 따라 다르니 본인에게 맞게 설정하도록 하자
또한 동시 실행 개수도 6개로 제한하였다.
이제 메시지 큐에 값을 추가한 뒤 람다를 확인해보면 잘 실행 되는것을 확인할 수 있다.
인스타그램 포스트의 업로드 날짜를 기준으로 이미 가져온 이전 포스트는 제외하고 최신 포스트까지 가져오도록 코드를 작성해준다. 크롤러는 최신 내역부터 가져오기에 만약 제외된 내역이 있다면 해당 페이지가 마지막 페이지라고 가정하고 더 이상 큐에 메시지를 보내지 않도록 동작해야한다. 필터링 내역이 없다면 다음 pagination token과 함께 sqs에 메시지를 넣어줘 다음 페이지를 람다가 가져올 수 있도록 개발해준다.
export const handler = async (event) => {
// 메시지 큐에서 데이터를 꺼냄
const { userId, lastTakenAt, pageToken } = event
// key값들을 기반으로 인스터그램 크롤링 진행
const postsFromProfileGrid = await fetchPostsFromProfileGrid({
usernameOrIdOrUrl: userId,
paginationToken: pageToken,
});
const posts = postsFromProfileGrid.body.data.items;
const { paginationToken } = postsFromProfileGrid.body;
// 포스트가 없으면 마지막페이지
if (!posts || posts.length === 0) {
return null;
}
// 정규화 및 데이터 저장
// ...
// paginationToken이 있으면 다음 크롤링을 위해 sqs에 넣어줌
if (paginationToken) {
await sendMessageToFIFO({
userId,
lastTakenAt,
pageToken: paginationToken,
});
}
};
export default handler;
sqs를 트리거하여 람다를 실행시키는 로직은 이미 개발이 되어있기에 위의 코드를 실행시키면 pagination token이 존재하지 않을때까지 sqs와 lambda의 순으로 알아서 코드가 실행되며 크롤링을 진행할것이다.
함수를 람다에 올려보자 이 때 프로젝트를 .zip형태로 압축해줘야하는데. Mac finder의 기본 compress 기능을 사용하면 문제가 발생한다는 이슈가 있으니 아래 명령어를 통해 압축하여 람다에 올려준다.
> zip -r ../instagram_crawler.zip *
docker 이미지 형태로 불러오거나 s3에 zip 파일을 저장하는 것이 좋지만 개발단계이기에 일단 직접 올린 뒤 실행해준다.
index.js 파일의 위치가 src/index.js에 있으니 runtime setting의 경로 또한 변경해준다.
이후 동작에 필요한 파일, env를 추가해준다
이어서 버전을 추가해준 뒤 테스트를 진행해본다. 일단은 timeout 에러가 발생하는 것을 확인할 수 있었다.
START RequestId: Version: $LATEST
info: Latest fetchedAt in instagramPosts:
info: targetAt:
info: Fetching posts from PROFILE_GRID...
END RequestId:
REPORT RequestId: Duration: 3000.00 ms Billed Duration: 3000 ms Memory Size: 128 MB Max Memory Used: 88 MB Init Duration: 552.41 ms Status: timeout
configuration으로 가서 timeout을 확인해보니 3초로 설정되어있는데 이를 1분으로 늘려서 테스트를 다시 진행해보자
이후 다시 테스트를 해 본 결과 잘 동작하는 것을 확인할 수 있었다.
loudwatch에도 로그가 잘 남는 것을 확인할 수 있다.
info: Latest fetchedAt in instagramPosts:
info: targetAt:
info: Fetching posts from PROFILE_GRID...
info: Fetched 12 posts for _ at null
info: No more new posts to fetch
info: 0 new posts found in profile grid
info: Fetching posts from REELS_SECTION...
info: Fetched 12 posts for _ at null
info: No more new posts to fetch
info: 0 new posts found in reels section
이제 sqs와 람다 개발 로직은 모두 잘 동작하는것을 확인했으니 전체 로직이 잘 동작하는지 테스트를 진행할 차례이다. 적절한 개수의 post가 존재하는 user를 사용하여 sqs에 메시지를 추가해주었다.
한 번의 람다 동작에서 12개의 인스타그램 post를 가져 오며 유저의 총 post 개수는 94개 이기에 람다 함수가 총 8번 실행되면 예상대로 잘 동작하는 것이다.
람다의 cloudWatch에서 확인 해 본 결과 함수가 총 8번 실행되었으며 예상대로 잘 동작하는 것을 확인할 수 있었다.
이제 다음으로는 유저의 id 리스트를 가지고 있는 최초에 이 코드를 실행시켜주는 트리거 역할을 하는 함수, 주기적으로 해당 함수가 동작하도록 하는 로직을 개발, 로깅, 정규화 등의 작업을 진행해보겠다.
'aws' 카테고리의 다른 글
[AWS]기타 서비스 (2) | 2024.08.25 |
---|---|
[AWS]비용 최적화 아키텍쳐 (0) | 2024.08.22 |
[AWS]보안성 아키텍처 (0) | 2024.08.20 |
Instagram Crawler Deployment Using AWS: Design (0) | 2024.08.15 |
AWS Certified Solutions Architect - Associate 취득 후기 (0) | 2024.08.13 |