From 16ceeb7daefa1624fc0027ba589096695e6c0efe Mon Sep 17 00:00:00 2001
From: Declan Teevan
Date: Thu, 18 Apr 2024 18:29:32 +0100
Subject: [PATCH] docs: improved write-up
added code documentation
clarified language in `README.md`
updated details in `pyproject.toml`
---
README.md | 15 ++++-----
pyproject.toml | 2 +-
request_coalescing_py/coalescer.py | 50 ++++++++++++++++++++++++++++--
request_coalescing_py/database.py | 27 ++++++++++++++++
request_coalescing_py/main.py | 4 ++-
request_coalescing_py/routes.py | 39 +++++++++++++++++++++++
6 files changed, 123 insertions(+), 14 deletions(-)
diff --git a/README.md b/README.md
index 9d90355..b4f6985 100644
--- a/README.md
+++ b/README.md
@@ -1,14 +1,12 @@
# Request Coalescing in Async Python
-## About
-
-This repository is a simple experiment revolved around implementing request coalescing in Python using Asyncio and FastAPI, inspired by the Discord Engineering team's blog post on [How Discord Stores Trillions of Messages.](https://discord.com/blog/how-discord-stores-trillions-of-messages)
+A simple experiment revolved around implementing request coalescing in Python, using Asyncio and FastAPI, inspired by the Discord Engineering team's blog post on [How Discord Stores Trillions of Messages.](https://discord.com/blog/how-discord-stores-trillions-of-messages)
## How does it work?
-When a client makes a request for an item of a specific id, it adds a task to the coalescer queue and waits for the result of that task.
+When a client makes a request for an item of a specific id, it will add a task to the coalescer queue and await the result of that task.
-Any subsequent requests, recieved in the meantime, for items of the same ID will subscribe to the future result of that first pending task instead of performing their own read query.
+Any subsequent requests recieved in the meantime, for items of the same id, will subscribe to the future result of that first pending task instead of performing their own expensive read query.

@@ -16,7 +14,6 @@ Any subsequent requests, recieved in the meantime, for items of the same ID will
```bash
> poetry run pytest
-
tests\test_standard.py Making 5x100 concurrent requests (500 total)...
Standard Requests: Took 457.228ms
Standard Metrics: {'requests': 500, 'db_calls': 500}
@@ -26,10 +23,10 @@ Coalesced Requests: Took 49.328ms
Coalesced Metrics: {'requests': 500, 'db_calls': 100}
```
-The number of database queries has fallen from 1:1 with requests to 1 database call per 5 requests (lining up with there being 5 requests being made concurrently in the tests).
+The number of database queries has fallen from 1:1 with requests to 1 database call per 5 requests (when coalescing and performing 5 requests concurrently).
## License and Contributing
-Please feel free to make pull requests containing any suggestions for improvements.
+Please feel free to open an issue or a pull request to discuss suggestions for improvements.
-This repository is open sourced under the [MIT License.](/LICENSE)
\ No newline at end of file
+This repository is open source under the [MIT License.](/LICENSE)
diff --git a/pyproject.toml b/pyproject.toml
index 86d20ca..9164fed 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -2,7 +2,7 @@
name = "request-coalescing-py"
version = "0.1.0"
description = "A simple demonstration of request coalescing in asynchronous Python."
-authors = ["Declan "]
+authors = ["Declan Teevan "]
readme = "README.md"
[tool.poetry.dependencies]
diff --git a/request_coalescing_py/coalescer.py b/request_coalescing_py/coalescer.py
index 16e7ecf..48e2695 100644
--- a/request_coalescing_py/coalescer.py
+++ b/request_coalescing_py/coalescer.py
@@ -6,25 +6,69 @@ from request_coalescing_py.database import DatabaseRepo
class CoalescingRepo:
+ """The coalescing repository.
+
+ This repository is responsible for creating, queuing and
+ processing futures (for requests).
+
+ Attributes:
+ _repo (DatabaseRepo): Downstream repository called to perform actual requests
+ _queue (asyncio.Queue): A task queue of requested item_ids awaiting asyncio.Future results
+ _queued (dict): A map of item_ids (int) -> pending futures (asyncio.Future)
+
+ """
+
def __init__(self, repo: DatabaseRepo):
+ """Initialise the coalescing repository.
+
+ Args:
+ repo (DatabaseRepo): The downstream database repository to be used
+ when performing actual requests
+
+ """
self._repo = repo
self._queue = asyncio.Queue()
- self._queued = {} # map of item_id: future
+ self._queued = {}
async def get_by_id(self, item_id: int) -> "asyncio.Future[Optional[Item]]":
- # Check if there is an already pending request for that item.
+ """Get an item by a specified id.
+
+ If there isn't already a pending future, for a requested item_id,
+ then a new future will be created and added to the task queue,
+ otherwise the existing future will be returned.
+
+ Args:
+ item_id (int): The requested item's id.
+
+ Returns:
+ asyncio.Future[Optional[Item]]: A future pending result.
+
+ """
+ # Check if there is an already pending task for that item.
fut = self._queued.get(item_id)
if fut:
return fut
- # There is not a pending request.
+ # There is not a pending task.
+ # Create a new future and add to the task queue.
fut = asyncio.get_event_loop().create_future()
self._queued[item_id] = fut
await self._queue.put(item_id)
return fut
async def process_queue(self) -> None:
+ """This subroutine is responsible for processing the
+ requests in the task queue.
+
+ 1) Recieves an item_id from the task queue
+ 2) Performs the request for the item (by calling the downstream
+ database `_repo`)
+ 3) Sets the result of the future (fulfilling all pending requests
+ for that item)
+ 4) Marks the task as complete.
+
+ """
while True:
item_id = await self._queue.get()
item = await self._repo.get_by_id(item_id)
diff --git a/request_coalescing_py/database.py b/request_coalescing_py/database.py
index 97fc28c..6e34633 100644
--- a/request_coalescing_py/database.py
+++ b/request_coalescing_py/database.py
@@ -8,10 +8,27 @@ from request_coalescing_py.models import Item
class DatabaseRepo:
+ """The database repository.
+
+ This repository is responsible for database operations.
+
+ Attributes:
+ app (FastAPI): The application instance (for access to the metrics object in state).
+ _db (databases.Database): The database instance used for requests.
+
+ """
+
def __init__(self, app: FastAPI) -> None:
+ """Initialise the database repository.
+
+ Args:
+ app (FastAPI): The application instance.
+
+ """
self.app = app
async def start_db(self) -> None:
+ """Opens a database connection and prepare the environment."""
self._db = Database("sqlite://./test.db")
await self._db.connect()
@@ -22,9 +39,19 @@ class DatabaseRepo:
pass
async def stop_db(self) -> None:
+ """Gracefully closes the database connection."""
await self._db.disconnect()
async def get_by_id(self, item_id: int) -> Optional[Item]:
+ """Get an item by a specified id (from the database).
+
+ Args:
+ item_id (int): The requested item's id.
+
+ Returns:
+ Optional[Item]: The item details (if found) or None.
+
+ """
self.app.state.metrics["db_calls"] += 1
# Simulate expensive read (50ms)
diff --git a/request_coalescing_py/main.py b/request_coalescing_py/main.py
index f0d9e23..3130ecb 100644
--- a/request_coalescing_py/main.py
+++ b/request_coalescing_py/main.py
@@ -11,6 +11,7 @@ app = FastAPI()
@app.on_event("startup")
async def startup_event():
+ """Prepare the API to take requests"""
# initialise metrics
app.state.DEFAULT_METRICS = {"requests": 0, "db_calls": 0}
app.state.metrics = app.state.DEFAULT_METRICS.copy()
@@ -19,13 +20,14 @@ async def startup_event():
app.state.repo = DatabaseRepo(app=app)
await app.state.repo.start_db()
- # initialise worker and coalescing repo
+ # initialise coalescing repo and spawn a worker task
app.state.coalescer = CoalescingRepo(repo=app.state.repo)
asyncio.create_task(app.state.coalescer.process_queue())
@app.on_event("shutdown")
async def shutdown_event():
+ """Gracefully stop connections on shutdown"""
# close DB connection
await app.state.repo.stop_db()
diff --git a/request_coalescing_py/routes.py b/request_coalescing_py/routes.py
index 945cb65..0242024 100644
--- a/request_coalescing_py/routes.py
+++ b/request_coalescing_py/routes.py
@@ -7,11 +7,13 @@ router = APIRouter()
@router.get("/metrics")
def view_metrics(request: Request) -> dict:
+ """View the metrics (number of requests and database calls processed)"""
return request.app.state.metrics
@router.post("/metrics")
def view_and_reset_metrics(request: Request) -> dict:
+ """View and reset the metrics"""
metrics = request.app.state.metrics
request.app.state.metrics = request.app.state.DEFAULT_METRICS.copy()
return metrics
@@ -19,6 +21,22 @@ def view_and_reset_metrics(request: Request) -> dict:
@router.get("/standard/{item_id}")
async def get_standard_route(request: Request, item_id: int) -> Item:
+ """Get an item by a specified id.
+
+ Requests to this route are not coalesced. Directly calls
+ the database repository to get items.
+
+ Args:
+ request (Request): Used to access the metrics object in app state.
+ item_id (int): The requested item id.
+
+ Raises:
+ HTTPException: When the requested item is not found.
+
+ Returns:
+ Item: Details of the requested item.
+
+ """
request.app.state.metrics["requests"] += 1
item = await request.app.state.repo.get_by_id(item_id)
@@ -30,6 +48,27 @@ async def get_standard_route(request: Request, item_id: int) -> Item:
@router.get("/coalesced/{item_id}")
async def get_coalesced_route(request: Request, item_id: int) -> Item:
+ """Get an item by a specified id.
+
+ This route will request a future from the coalescing
+ repository and await the resulting response of that future,
+ pending being processed by the task queue.
+
+ Requests to this route should reduce the overall number
+ of database calls being made (should requests be made
+ simultaneously).
+
+ Args:
+ request (Request): Used to access the metrics object in app state.
+ item_id (int): The requested item id.
+
+ Raises:
+ HTTPException: When the requested item is not found.
+
+ Returns:
+ Item: Details of the requested item.
+
+ """
request.app.state.metrics["requests"] += 1
item_future = await request.app.state.coalescer.get_by_id(item_id)