DynamoDB Streams batch processing 설정
console에서 이벤트 매핑시 uuid가 나오지 않기 때문에 sdk로 새로 매핑 시켜야 한다.
$ aws lambda create-event-source-mapping \\
--function-name dev-kinx-midibus-vod-playdata-stream \\
--batch-size 1000 --maximum-batching-window-in-seconds 30 \\
--event-source-arn arn:aws:dynamodb:ap-northeast-2:834444597251:table/data_play_vod/stream/2021-01-06T03:22:58.558 --starting-position LATEST
{
"UUID": "5120f537-b328-42b4-9161-cec7b1460f58",
"StartingPosition": "LATEST",
"BatchSize": 1000,
"MaximumBatchingWindowInSeconds": 30,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:dynamodb:ap-northeast-2:834444597251:table/data_play_vod/stream/2021-01-06T03:22:58.558",
"FunctionArn": "arn:aws:lambda:ap-northeast-2:834444597251:function:dev-kinx-midibus-vod-playdata-stream",
"LastModified": "2021-02-02T16:34:30.780000+09:00",
"LastProcessingResult": "No records processed",
"State": "Creating",
"StateTransitionReason": "User action",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1,
"TumblingWindowInSeconds": 0,
"FunctionResponseTypes": []
}
→ 30초 마다 Lambda가 실행됨
실시간으로 보이는 count 집계를 위한 프로세스라 FIFO를 사용하려다, 단순히 이전 카운트에 1을 더하는 작업이기 떄문에 무리가 없을 것 같아 더 높은 성능과 적은 비용을 가지는 일반 큐로 생성했다.
SQS는 sender와 receiver가 필요하다. sender는 DynamoDB의 Streams기능의 트리거 람다 함수가 될 것이고 receiver가 될 람다 함수는 아래와 같이 간단하게 받은 메세지를 출력하도록 작성했다.
SDK로 작성시 SQS와 매핑하는 설정 부분이 필요하지만 콘솔에서 트리거 설정을 한다면 다른 설정 코드는 필요없다.
exports.handler = async function(event, context) {
let insertedObjects = [];
event.Records.forEach( record => {
const { body } = record;
console.log(JSON.parse(body));
});
}
channelObjectForSQS
배열에 channelId - objectId 형태의 문자열을 추가한다.async function setDataStructure(record, dataRefined,channelObjectForSQS ) {
try {
const evt = record.eventName;
if (evt !== "INSERT" && evt !== "MODIFY") return;
const dynamodb = record.dynamodb;
let { uuid, date, channelId, objectId } = dynamodb.NewImage;
if (evt === "INSERT") {channelObjectForSQS.push(`${channelId.S}-${objectId.S}`); };
.....
Body
에 담아 메세지를 전송한다.// 이벤트 큐 보내기
if ( channelObjectForSQS.length ) {
const stringifyData = JSON.stringify(channelObjectForSQS);
const sqsParam = {
MessageBody: stringifyData,
QueueUrl: SQS_URL
}
console.log(sqsParam);
try {
sqs.sendMessage(sqsParam).promise();
console.log("SENDING SQS MESSAGE FINISEHD")
} catch(err) {
console.error("SQS ERROR >> ", err);
}
}