Я изучаю, как использовать DynamoDB и различные сервисы AWS, поэтому я решил написать простое приложение для подсчета посещений, чтобы научиться пользоваться DynamoDB. Это приложение дедуплицирует попадания от одного и того же клиента для одной и той же страницы, сохраняя хэш их пользовательского агента и IP-адреса. Он также хранит 1 запись о попаданиях в день и накапливает их во временной области. Я ищу отзывы о том, как я структурировал вызовы динамо-машины и возможные проблемы, которые у нее есть. Я подозреваю, что могу использовать последовательное чтение, когда мне это не нужно. Мне также любопытно, использовал ли я правильный шаблон для обработки upsert в Dynamodb.
Таблица Dynamo db имеет хэш-ключ «url» и поле сортировки, называемое «as_of_when», в котором примерно указано, когда произошли совпадения.
"""
A simple hit counter stuff to learn the various apis of dynamodb.
"""
import datetime
import os
import hashlib
import base64
import boto3
from botocore.exceptions import ClientError
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
from pybadges import badge
patch_all()
TABLE_NAME = os.getenv("TABLE_NAME", "hit_counts")
dynamo = boto3.client('dynamodb')
def get_today() -> str:
"""
Gets a formatted date string
"""
return datetime.datetime.now().strftime("%Y-%m-%d")
def get_full_timestamp() -> str:
"""
Gets a complete date string
"""
return datetime.datetime.now().strftime("%Y%m%d%H%M%S")
def get_previous_total(url:str):
"""
Gets the most recent known total. Should be used before inserting a new instance
"""
response = dynamo.query(
TableName=TABLE_NAME,
Select="ALL_ATTRIBUTES",
Limit=1,
KeyConditionExpression="the_url=:urlval",
ScanIndexForward=False,
ExpressionAttributeValues={
":urlval": {
"S": url
}
}
)
if response['Count'] == 0:
return 0
return response['Items'][0]['accumulated_count']['N']
@xray_recorder.capture('insert_new_entry')
def insert_new_entry(url:str, as_of_when : str, user_hash : str, user_id : str):
the_count = str(int(get_previous_total(url)) + 1)
result = dynamo.put_item(
TableName=TABLE_NAME,
Item={
'the_url': {
'S': url
},
'as_of_when': {
'S': as_of_when
},
'today_count': {
'N': '1'
},
'accumulated_count': {
'N': the_count
},
'user_id_hashes': {
'SS': [user_hash]
},
'user_hashes': {
'SS': [user_id]
}
},
ReturnValues="ALL_OLD",
ReturnItemCollectionMetrics="SIZE",
ReturnConsumedCapacity='TOTAL',
ConditionExpression='attribute_not_exists(the_url) and attribute_not_exists(as_of_when)'
)
print('insert_result', result)
return result
@xray_recorder.capture('update_existing_entry')
def update_existing_entry(url:str, as_of_when : str, user_hash : str, user_id : str):
result = dynamo.execute_statement(
Statement=f"""
UPDATE {TABLE_NAME}
SET today_count = today_count + 1
SET accumulated_count = accumulated_count + 1
SET user_hashes = set_add(user_hashes, ?)
SET user_id_hashes = set_add(user_id_hashes, ?)
WHERE the_url = ? AND as_of_when = ? RETURNING ALL NEW *
""",
Parameters=[
{
"SS": [user_id]
},
{
"SS": [user_hash]
},
{
"S": url
},
{
"S": as_of_when
}
]
)
return result
@xray_recorder.capture('get_todays_entry')
def get_todays_entry(url:str, as_of_when : str):
result = dynamo.get_item(
TableName=TABLE_NAME,
Key={
'the_url': {
'S': url
},
'as_of_when': {
'S': as_of_when
}
},
AttributesToGet=[
'today_count',
'accumulated_count',
'user_hashes',
'user_id_hashes'
],
ConsistentRead=True
)
print('get_todays_entry', result)
if 'Item' in result:
return result['Item']
return None
def increment_hit_count(url:str, as_of_when : str, user_hash : str, user_id : str):
"""
increments a counter instance in the dynamo table
"""
current_hits = get_todays_entry(url, as_of_when)
if current_hits is None:
# Insert new entry
x = insert_new_entry(url, as_of_when, user_hash, user_id)
current_hits = {
'accumulated_count': {
'N': "1"
}
}
else:
# Check for existence in existing object
print(current_hits['user_id_hashes'])
print(user_id)
if user_hash not in current_hits['user_id_hashes']['SS']:
result = update_existing_entry(url, as_of_when, user_hash, user_id)
if 'Items' in result:
current_hits = result['Items'][0]
print(result)
return current_hits['accumulated_count']['N']
def hash_api_gateway_event(event : dict):
reqContext = event['requestContext']['http']
reqString = ':'.join((reqContext['sourceIp'], reqContext['protocol'] + reqContext['userAgent']))
m = hashlib.md5()
m.update(reqString.encode('utf-8'))
return base64.b64encode(m.digest()).decode('utf-8'), reqString
def handler(event : dict, context):
"""
# Invoked via query string parameters from an image tag
# Returns a SVG unless text parameter is set.
"""
print(event)
print("hello")
user_hash, og_user_id = hash_api_gateway_event(event)
print(user_hash, og_user_id)
if 'queryStringParameters' in event:
url = event['queryStringParameters']['url']
result = increment_hit_count(url, get_today(), user_hash, og_user_id)
print(result)
body = badge(left_text="Total Views", right_text=result)
return {
"statusCode": 200,
"isBase64Encoded": False,
"headers": {
"Content-Type": "image/svg+xml"
},
"body": body
}
2 ответа
[Edited following discussion in the comments]
Несколько кратких заметок по вашему коду:
- Подсказки вашего типа могут быть более полными / описательными. У вас нет аннотаций к возврату; то
context
параметр в вашемhandler
функция не имеет подсказки типа; и аннотируя вашevent
параметр вhandler
а такжеhash_api_gateway_event
с голымdict
очень мало сообщает контролеру типов. Было бы намного лучше аннотировать эти параметры параметризованнымdict
который информирует средство проверки типов об ожидаемых типах ключей и значений словаря. Вы также можете рассмотретьtyping.TypedDict
, который позволяет вам указать тип значения, которое вы ожидаете связать с отдельными ключами в словаре. - Было бы неплохо иметь строки документации во всех ваших общедоступных функциях, даже если это всего лишь краткое однострочное описание того, чего достигает ваша функция.
- «Плоский лучше, чем вложенный», поэтому подумайте об использовании
from datetime import datetime
а такжеfrom base64 import bs4encode
вместо тогоimport datetime
а такжеimport base64
. В обоих случаях вы используете только один класс / функцию из соответствующего модуля; он делает ваш код более лаконичным и читаемым без какой-либо двусмысленности в отношении того, что делают функции / классы; и немного эффективнее не искатьdatetime
класс вdatetime
модуль всякий раз, когда вы хотите его использовать.
Я мало знаю о DynamoDB, поэтому не могу комментировать, что пытается выполнить ваш код, но с этим (большим) предостережением и за исключением пунктов, которые я упомянул выше, ваш код выглядит довольно чистым с моей точки зрения.
Для вашего формата даты
datetime.datetime.now().strftime("%Y-%m-%d")
более прямо выражается как
datetime.date.today().isoformat()
Для этого выражения:
return response['Items'][0]['accumulated_count']['N']
если вы знаете, что в ответе только один элемент, тогда
item, = response['Items']
return item['accumulated_count']['N']
будет более явным и предложит бесплатно утверждение, что был возвращен ровно один товар.
Этот способ:
def insert_new_entry(url:str, as_of_when : str, user_hash : str, user_id : str):
имеет непоследовательный интервал вокруг подсказок типа; это должно выглядеть как url: str
. Кроме того, вам не хватает подсказки типа возврата, которая в данном случае совпадает с типом из dynamo.put_item
.
print('insert_result', result)
и подобное должно быть преобразовано в вызов журнала.
@Reinderien: да, достаточно справедливо. Я полагаю, что важно четко указать, что функция выполняет что-то динамическое, а не извлекает какое-либо кэшированное свойство. Отредактировали это; спасибо за критику.
— Алекс Вэйгуд