구성

  1. DynamoDB Streams batch processing 설정

    console에서 이벤트 매핑시 uuid가 나오지 않기 때문에 sdk로 새로 매핑 시켜야 한다.

    $ aws lambda create-event-source-mapping \\
    --function-name dev-kinx-midibus-vod-playdata-stream \\
    --batch-size 1000 --maximum-batching-window-in-seconds 30 \\
    --event-source-arn arn:aws:dynamodb:ap-northeast-2:834444597251:table/data_play_vod/stream/2021-01-06T03:22:58.558 --starting-position LATEST
    
    {
        "UUID": "5120f537-b328-42b4-9161-cec7b1460f58",
        "StartingPosition": "LATEST",
        "BatchSize": 1000,
        "MaximumBatchingWindowInSeconds": 30,
        "ParallelizationFactor": 1,
        "EventSourceArn": "arn:aws:dynamodb:ap-northeast-2:834444597251:table/data_play_vod/stream/2021-01-06T03:22:58.558",
        "FunctionArn": "arn:aws:lambda:ap-northeast-2:834444597251:function:dev-kinx-midibus-vod-playdata-stream",
        "LastModified": "2021-02-02T16:34:30.780000+09:00",
        "LastProcessingResult": "No records processed",
        "State": "Creating",
        "StateTransitionReason": "User action",
        "DestinationConfig": {
            "OnFailure": {}
        },
        "MaximumRecordAgeInSeconds": -1,
        "BisectBatchOnFunctionError": false,
        "MaximumRetryAttempts": -1,
        "TumblingWindowInSeconds": 0,
        "FunctionResponseTypes": []
    }
    

→ 30초 마다 Lambda가 실행됨

  1. SQS 생성

실시간으로 보이는 count 집계를 위한 프로세스라 FIFO를 사용하려다, 단순히 이전 카운트에 1을 더하는 작업이기 떄문에 무리가 없을 것 같아 더 높은 성능과 적은 비용을 가지는 일반 큐로 생성했다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/621deacc-1dea-4c94-9d7a-7e751436f41b/Untitled.png

SQS는 sender와 receiver가 필요하다. sender는 DynamoDB의 Streams기능의 트리거 람다 함수가 될 것이고 receiver가 될 람다 함수는 아래와 같이 간단하게 받은 메세지를 출력하도록 작성했다.

SDK로 작성시 SQS와 매핑하는 설정 부분이 필요하지만 콘솔에서 트리거 설정을 한다면 다른 설정 코드는 필요없다.

exports.handler = async function(event, context) {
  let insertedObjects = [];
  event.Records.forEach(  record => {
    const { body } = record;
    console.log(JSON.parse(body));
  });
}

구현

  1. INSERT 작업이 일어날 때마다 channelObjectForSQS 배열에 channelId - objectId 형태의 문자열을 추가한다.
async function setDataStructure(record, dataRefined,channelObjectForSQS ) {
  try {
    const evt = record.eventName;
    if (evt !== "INSERT" && evt !== "MODIFY") return;

    const dynamodb = record.dynamodb;
    let { uuid, date, channelId, objectId } = dynamodb.NewImage;
    
    if (evt === "INSERT") {channelObjectForSQS.push(`${channelId.S}-${objectId.S}`); };

.....
  1. Queue 메세지 파라미터 Body 에 담아 메세지를 전송한다.
// 이벤트 큐 보내기
    if ( channelObjectForSQS.length ) {
      const stringifyData = JSON.stringify(channelObjectForSQS);
        const sqsParam = {
        MessageBody: stringifyData,
        QueueUrl: SQS_URL
      }
      console.log(sqsParam);
   
      try {
        sqs.sendMessage(sqsParam).promise();
        console.log("SENDING SQS MESSAGE FINISEHD")
      } catch(err) {
        console.error("SQS ERROR >> ", err);
      }

    }