Счетчик посещений для AWS Python Lambda на базе DynamoDB

Я изучаю, как использовать DynamoDB и различные сервисы AWS, поэтому я решил написать простое приложение для подсчета посещений, чтобы научиться пользоваться DynamoDB. Это приложение дедуплицирует попадания от одного и того же клиента для одной и той же страницы, сохраняя хэш их пользовательского агента и IP-адреса. Он также хранит 1 запись о попаданиях в день и накапливает их во временной области. Я ищу отзывы о том, как я структурировал вызовы динамо-машины и возможные проблемы, которые у нее есть. Я подозреваю, что могу использовать последовательное чтение, когда мне это не нужно. Мне также любопытно, использовал ли я правильный шаблон для обработки upsert в Dynamodb.

Таблица Dynamo db имеет хэш-ключ «url» и поле сортировки, называемое «as_of_when», в котором примерно указано, когда произошли совпадения.

"""
A simple hit counter stuff to learn the various apis of dynamodb.
"""

import datetime
import os
import hashlib
import base64

import boto3
from botocore.exceptions import ClientError
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
from pybadges import badge

patch_all()

TABLE_NAME = os.getenv("TABLE_NAME", "hit_counts")

dynamo = boto3.client('dynamodb')


def get_today() -> str:
    """
    Gets a formatted date string
    """
    return datetime.datetime.now().strftime("%Y-%m-%d")

def get_full_timestamp() -> str:
    """
    Gets a complete date string
    """
    return datetime.datetime.now().strftime("%Y%m%d%H%M%S")

def get_previous_total(url:str):
    """
    Gets the most recent known total. Should be used before inserting a new instance
    """
    response = dynamo.query(
        TableName=TABLE_NAME,
        Select="ALL_ATTRIBUTES",
        Limit=1,
        KeyConditionExpression="the_url=:urlval",
        ScanIndexForward=False,
        ExpressionAttributeValues={
            ":urlval": {
                "S": url
            }
        }
    )
    if response['Count'] == 0:
        return 0
    return response['Items'][0]['accumulated_count']['N']

@xray_recorder.capture('insert_new_entry')
def insert_new_entry(url:str, as_of_when : str, user_hash : str, user_id : str):
    the_count = str(int(get_previous_total(url)) + 1)

    result = dynamo.put_item(
        TableName=TABLE_NAME,
        Item={
            'the_url': {
                'S': url
            },
            'as_of_when': {
                'S': as_of_when
            },
            'today_count': {
                'N': '1'
            },
            'accumulated_count': {
                'N': the_count
            },
            'user_id_hashes': {
                'SS': [user_hash]
            },
            'user_hashes': {
                'SS': [user_id]
            }
        },
        ReturnValues="ALL_OLD",
        ReturnItemCollectionMetrics="SIZE",
        ReturnConsumedCapacity='TOTAL',
        ConditionExpression='attribute_not_exists(the_url) and attribute_not_exists(as_of_when)'
    )
    print('insert_result', result)

    
    return result

@xray_recorder.capture('update_existing_entry')
def update_existing_entry(url:str, as_of_when : str, user_hash : str, user_id : str):
    result = dynamo.execute_statement(
        Statement=f"""
            UPDATE {TABLE_NAME}
            SET today_count = today_count + 1
            SET accumulated_count = accumulated_count + 1
            SET user_hashes = set_add(user_hashes, ?)
            SET user_id_hashes = set_add(user_id_hashes, ?)
            WHERE the_url = ? AND as_of_when = ? RETURNING ALL NEW *
        """,
        Parameters=[
            {
                "SS": [user_id]
            },
            {
                "SS": [user_hash]
            },
            {
                "S": url
            },
            {
                "S": as_of_when
            }
        ]
    )
    return result

@xray_recorder.capture('get_todays_entry')
def get_todays_entry(url:str, as_of_when : str):
    result = dynamo.get_item(
        TableName=TABLE_NAME,
        Key={
            'the_url': {
                'S': url
            },
            'as_of_when': {
                'S': as_of_when
            }
        },
        AttributesToGet=[
            'today_count',
            'accumulated_count',
            'user_hashes',
            'user_id_hashes'
        ],
        ConsistentRead=True
    )

    print('get_todays_entry', result)
    if 'Item' in result:
        return result['Item']
    return None


def increment_hit_count(url:str, as_of_when : str, user_hash : str, user_id : str):
    """
    increments a counter instance in the dynamo table
    """
    current_hits = get_todays_entry(url, as_of_when)

    if current_hits is None:
        # Insert new entry
        x = insert_new_entry(url, as_of_when, user_hash, user_id)
        current_hits = {
            'accumulated_count': {
                'N': "1"
            }
        }
    else:
        # Check for existence in existing object
        print(current_hits['user_id_hashes'])
        print(user_id)
        if user_hash not in current_hits['user_id_hashes']['SS']:
            result = update_existing_entry(url, as_of_when, user_hash, user_id)
            if 'Items' in result:
                current_hits = result['Items'][0]
            print(result)

    
    return  current_hits['accumulated_count']['N']




def hash_api_gateway_event(event : dict):
    reqContext = event['requestContext']['http']
    reqString = ':'.join((reqContext['sourceIp'], reqContext['protocol'] + reqContext['userAgent']))
    
    m = hashlib.md5()
    m.update(reqString.encode('utf-8'))

    return base64.b64encode(m.digest()).decode('utf-8'), reqString


def handler(event : dict, context):
    """
    # Invoked via query string parameters from an image tag
    # Returns a SVG unless text parameter is set.
    """
    print(event)
    print("hello")

    user_hash, og_user_id = hash_api_gateway_event(event)
    print(user_hash, og_user_id)

    if 'queryStringParameters' in event:
        url = event['queryStringParameters']['url']
        result = increment_hit_count(url, get_today(), user_hash, og_user_id)

        print(result)


    body =  badge(left_text="Total Views", right_text=result)

    return {
        "statusCode": 200,
        "isBase64Encoded": False,
        "headers": {
            "Content-Type": "image/svg+xml"
        },
        "body": body
    }

2 ответа
2

[Edited following discussion in the comments]

Несколько кратких заметок по вашему коду:

  1. Подсказки вашего типа могут быть более полными / описательными. У вас нет аннотаций к возврату; то context параметр в вашем handler функция не имеет подсказки типа; и аннотируя ваш event параметр в handler а также hash_api_gateway_event с голым dict очень мало сообщает контролеру типов. Было бы намного лучше аннотировать эти параметры параметризованным dict который информирует средство проверки типов об ожидаемых типах ключей и значений словаря. Вы также можете рассмотреть typing.TypedDict, который позволяет вам указать тип значения, которое вы ожидаете связать с отдельными ключами в словаре.
  2. Было бы неплохо иметь строки документации во всех ваших общедоступных функциях, даже если это всего лишь краткое однострочное описание того, чего достигает ваша функция.
  3. «Плоский лучше, чем вложенный», поэтому подумайте об использовании from datetime import datetime а также from base64 import bs4encode вместо того import datetime а также import base64. В обоих случаях вы используете только один класс / функцию из соответствующего модуля; он делает ваш код более лаконичным и читаемым без какой-либо двусмысленности в отношении того, что делают функции / классы; и немного эффективнее не искать datetime класс в datetime модуль всякий раз, когда вы хотите его использовать.

Я мало знаю о DynamoDB, поэтому не могу комментировать, что пытается выполнить ваш код, но с этим (большим) предостережением и за исключением пунктов, которые я упомянул выше, ваш код выглядит довольно чистым с моей точки зрения.

  • @Reinderien: да, достаточно справедливо. Я полагаю, что важно четко указать, что функция выполняет что-то динамическое, а не извлекает какое-либо кэшированное свойство. Отредактировали это; спасибо за критику.

    — Алекс Вэйгуд

Для вашего формата даты

datetime.datetime.now().strftime("%Y-%m-%d")

более прямо выражается как

datetime.date.today().isoformat()

Для этого выражения:

return response['Items'][0]['accumulated_count']['N']

если вы знаете, что в ответе только один элемент, тогда

item, = response['Items']
return item['accumulated_count']['N']

будет более явным и предложит бесплатно утверждение, что был возвращен ровно один товар.

Этот способ:

def insert_new_entry(url:str, as_of_when : str, user_hash : str, user_id : str):

имеет непоследовательный интервал вокруг подсказок типа; это должно выглядеть как url: str. Кроме того, вам не хватает подсказки типа возврата, которая в данном случае совпадает с типом из dynamo.put_item.

print('insert_result', result) и подобное должно быть преобразовано в вызов журнала.

    Добавить комментарий

    Ваш адрес email не будет опубликован. Обязательные поля помечены *