로직

사용자가 live 재생시 uuid 를 Primary Key로하는 'data_play_live' 테이블에 재생 내역이 저장된다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/3ac1378c-d545-4731-bdd6-3ee7cbaaa986/Untitled.png

플레이어는 5초마다 재생 데이터를 POST로 요청하고, 같은 uuid 에서 온 요청은 새로 들어온 날짜로 endDate가 갱신된다.

'data_play_live' 테이블에서 INSERT / MODIFY 이벤트가 발생하게 되면 DynamoDB Stream 기능이 실행된다. Stream lambda는 재생 데이터 row data를 기반으로 'data_play_live_agg_channel' 테이블과 'data_play_live_agg_stream' 테이블에 각각 channelId 별, 그리고 streamId 별 재생 수를 집계하여 저장한다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/675c273e-3b37-4037-ac25-d36322f199aa/Untitled.png

데이터는 시간을 기준으로 날짜/시간/분 단위로 재생수가 집계된다. SK (Sort Key) count는 누적 count수를 의미한다. 재생 데이터 테이블에서 INSERT 이벤트가 발생할 때만 1씩 증가된다.

SK가 log 로 시작하는 데이터 들은 시간별 집계 데이터이다.

DynamoDB의 update 작업에서 ADD Number 작업을 요청하면 해당 Key를 가진 데이터가 없을 경우 자동으로 기본값을 0으로 지정해 1을 더해준다. 따라서 처리하고 있는 데이터의 date 파라미터에서 날짜, 시간, 분 까지의 데이터로 나눠 update만 해주면 자동으로 데이터가 더해지거나 새로 생성된다.

const updateCountIfRowInserted = async(tableName, PK, logDate) => {

  const params = {
    paramC: setDdbParam(tableName, {
      ...PK,
      SK: "count"
    }),
    paramM: setDdbParam(tableName, {
      ...PK,
      SK: `log-m-${logDate.logDateM}`,
    }),
    paramH: setDdbParam(tableName, {
      ...PK,
      SK: `log-h-${logDate.logDateH}`,
    }),
    paramD: setDdbParam(tableName, {
      ...PK,
      SK: `log-d-${logDate.logDateD}`,
    })
  };

  const updatePromises = Promise.all([
    ddb.update(params.paramC).promise(),
    ddb.update(params.paramM).promise(),
    ddb.update(params.paramH).promise(),
    ddb.update(params.paramD).promise()
  ]);

INSERT 이벤트의 경우 카운트 증가만 시켜주면 되지만 MODIFY 이벤트는 이전 값과 비교해 달라진 경우에만 증가시켜야 한다.

예를 들어 2021년 2월 24일 13시 10분 3초에 재생 데이터로 카운트가 1씩 증가한 경우, 그 다음 요청으로 들어오는 2021년 2월 24일 13시 10분 8초에 들어온 재생 데이터로는 날짜, 시간, 분 중 어떤 값도 변경되지 않았기 때문에 증가되는 카운트 값이 없어야한다.

const updateCountIfRowModified = async(tableName, PK, logDate, oldLogDate) => {
  // no need to add 1 to SK-count
  // check if time data changed

  const oldLogDateM = oldLogDate.slice(0, -2);
  const oldLogDateH = oldLogDateM.slice(0, -2);
  const oldLogDateD = oldLogDateH.slice(0, -2);

  let updatePromiseParams = [];
  if (oldLogDateM !== logDate.logDateM) {
    updatePromiseParams.push(
      ddb
      .update(
        setDdbParam(tableName, { ...PK, SK: `log-m-${logDate.logDateM}` })
      )
      .promise()
    );

    // update max
    await checkMaxConcurrent(tableName, PK, oldLogDateM)
  }
  if (oldLogDateH !== logDate.logDateH) {
    updatePromiseParams.push(
      ddb
      .update(
        setDdbParam(tableName, { ...PK, SK: `log-h-${logDate.logDateH}` })
      )
      .promise()
    );
  }
  if (oldLogDateD !== logDate.logDateD) {
    updatePromiseParams.push(
      ddb
      .update(
        setDdbParam(tableName, { ...PK, SK: `log-d-${logDate.logDateD}` })
      )
      .promise()
    );
  }
  await promiseHandler(Promise.all(updatePromiseParams));
};