docs: improved write-up

added code documentation

clarified language in `README.md`

updated details in `pyproject.toml`
This commit is contained in:
2024-04-18 18:29:32 +01:00
parent c8cfbadfc6
commit 16ceeb7dae
6 changed files with 123 additions and 14 deletions

View File

@@ -1,14 +1,12 @@
# Request Coalescing in Async Python # Request Coalescing in Async Python
## About A simple experiment revolved around implementing request coalescing in Python, using Asyncio and FastAPI, inspired by the Discord Engineering team's blog post on [How Discord Stores Trillions of Messages.](https://discord.com/blog/how-discord-stores-trillions-of-messages)
This repository is a simple experiment revolved around implementing request coalescing in Python using Asyncio and FastAPI, inspired by the Discord Engineering team's blog post on [How Discord Stores Trillions of Messages.](https://discord.com/blog/how-discord-stores-trillions-of-messages)
## How does it work? ## How does it work?
When a client makes a request for an item of a specific id, it adds a task to the coalescer queue and waits for the result of that task. When a client makes a request for an item of a specific id, it will add a task to the coalescer queue and await the result of that task.
Any subsequent requests, recieved in the meantime, for items of the same ID will subscribe to the future result of that first pending task instead of performing their own read query. Any subsequent requests recieved in the meantime, for items of the same id, will subscribe to the future result of that first pending task instead of performing their own expensive read query.
![Coalescing Diagram](/docs/img-1.png) ![Coalescing Diagram](/docs/img-1.png)
@@ -16,7 +14,6 @@ Any subsequent requests, recieved in the meantime, for items of the same ID will
```bash ```bash
> poetry run pytest > poetry run pytest
tests\test_standard.py Making 5x100 concurrent requests (500 total)... tests\test_standard.py Making 5x100 concurrent requests (500 total)...
Standard Requests: Took 457.228ms Standard Requests: Took 457.228ms
Standard Metrics: {'requests': 500, 'db_calls': 500} Standard Metrics: {'requests': 500, 'db_calls': 500}
@@ -26,10 +23,10 @@ Coalesced Requests: Took 49.328ms
Coalesced Metrics: {'requests': 500, 'db_calls': 100} Coalesced Metrics: {'requests': 500, 'db_calls': 100}
``` ```
The number of database queries has fallen from 1:1 with requests to 1 database call per 5 requests (lining up with there being 5 requests being made concurrently in the tests). The number of database queries has fallen from 1:1 with requests to 1 database call per 5 requests (when coalescing and performing 5 requests concurrently).
## License and Contributing ## License and Contributing
Please feel free to make pull requests containing any suggestions for improvements. Please feel free to open an issue or a pull request to discuss suggestions for improvements.
This repository is open sourced under the [MIT License.](/LICENSE) This repository is open source under the [MIT License.](/LICENSE)

View File

@@ -2,7 +2,7 @@
name = "request-coalescing-py" name = "request-coalescing-py"
version = "0.1.0" version = "0.1.0"
description = "A simple demonstration of request coalescing in asynchronous Python." description = "A simple demonstration of request coalescing in asynchronous Python."
authors = ["Declan <declan@hexolan.dev>"] authors = ["Declan Teevan <dt@hexolan.com>"]
readme = "README.md" readme = "README.md"
[tool.poetry.dependencies] [tool.poetry.dependencies]

View File

@@ -6,25 +6,69 @@ from request_coalescing_py.database import DatabaseRepo
class CoalescingRepo: class CoalescingRepo:
"""The coalescing repository.
This repository is responsible for creating, queuing and
processing futures (for requests).
Attributes:
_repo (DatabaseRepo): Downstream repository called to perform actual requests
_queue (asyncio.Queue): A task queue of requested item_ids awaiting asyncio.Future results
_queued (dict): A map of item_ids (int) -> pending futures (asyncio.Future)
"""
def __init__(self, repo: DatabaseRepo): def __init__(self, repo: DatabaseRepo):
"""Initialise the coalescing repository.
Args:
repo (DatabaseRepo): The downstream database repository to be used
when performing actual requests
"""
self._repo = repo self._repo = repo
self._queue = asyncio.Queue() self._queue = asyncio.Queue()
self._queued = {} # map of item_id: future self._queued = {}
async def get_by_id(self, item_id: int) -> "asyncio.Future[Optional[Item]]": async def get_by_id(self, item_id: int) -> "asyncio.Future[Optional[Item]]":
# Check if there is an already pending request for that item. """Get an item by a specified id.
If there isn't already a pending future, for a requested item_id,
then a new future will be created and added to the task queue,
otherwise the existing future will be returned.
Args:
item_id (int): The requested item's id.
Returns:
asyncio.Future[Optional[Item]]: A future pending result.
"""
# Check if there is an already pending task for that item.
fut = self._queued.get(item_id) fut = self._queued.get(item_id)
if fut: if fut:
return fut return fut
# There is not a pending request. # There is not a pending task.
# Create a new future and add to the task queue.
fut = asyncio.get_event_loop().create_future() fut = asyncio.get_event_loop().create_future()
self._queued[item_id] = fut self._queued[item_id] = fut
await self._queue.put(item_id) await self._queue.put(item_id)
return fut return fut
async def process_queue(self) -> None: async def process_queue(self) -> None:
"""This subroutine is responsible for processing the
requests in the task queue.
1) Recieves an item_id from the task queue
2) Performs the request for the item (by calling the downstream
database `_repo`)
3) Sets the result of the future (fulfilling all pending requests
for that item)
4) Marks the task as complete.
"""
while True: while True:
item_id = await self._queue.get() item_id = await self._queue.get()
item = await self._repo.get_by_id(item_id) item = await self._repo.get_by_id(item_id)

View File

@@ -8,10 +8,27 @@ from request_coalescing_py.models import Item
class DatabaseRepo: class DatabaseRepo:
"""The database repository.
This repository is responsible for database operations.
Attributes:
app (FastAPI): The application instance (for access to the metrics object in state).
_db (databases.Database): The database instance used for requests.
"""
def __init__(self, app: FastAPI) -> None: def __init__(self, app: FastAPI) -> None:
"""Initialise the database repository.
Args:
app (FastAPI): The application instance.
"""
self.app = app self.app = app
async def start_db(self) -> None: async def start_db(self) -> None:
"""Opens a database connection and prepare the environment."""
self._db = Database("sqlite://./test.db") self._db = Database("sqlite://./test.db")
await self._db.connect() await self._db.connect()
@@ -22,9 +39,19 @@ class DatabaseRepo:
pass pass
async def stop_db(self) -> None: async def stop_db(self) -> None:
"""Gracefully closes the database connection."""
await self._db.disconnect() await self._db.disconnect()
async def get_by_id(self, item_id: int) -> Optional[Item]: async def get_by_id(self, item_id: int) -> Optional[Item]:
"""Get an item by a specified id (from the database).
Args:
item_id (int): The requested item's id.
Returns:
Optional[Item]: The item details (if found) or None.
"""
self.app.state.metrics["db_calls"] += 1 self.app.state.metrics["db_calls"] += 1
# Simulate expensive read (50ms) # Simulate expensive read (50ms)

View File

@@ -11,6 +11,7 @@ app = FastAPI()
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
"""Prepare the API to take requests"""
# initialise metrics # initialise metrics
app.state.DEFAULT_METRICS = {"requests": 0, "db_calls": 0} app.state.DEFAULT_METRICS = {"requests": 0, "db_calls": 0}
app.state.metrics = app.state.DEFAULT_METRICS.copy() app.state.metrics = app.state.DEFAULT_METRICS.copy()
@@ -19,13 +20,14 @@ async def startup_event():
app.state.repo = DatabaseRepo(app=app) app.state.repo = DatabaseRepo(app=app)
await app.state.repo.start_db() await app.state.repo.start_db()
# initialise worker and coalescing repo # initialise coalescing repo and spawn a worker task
app.state.coalescer = CoalescingRepo(repo=app.state.repo) app.state.coalescer = CoalescingRepo(repo=app.state.repo)
asyncio.create_task(app.state.coalescer.process_queue()) asyncio.create_task(app.state.coalescer.process_queue())
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event(): async def shutdown_event():
"""Gracefully stop connections on shutdown"""
# close DB connection # close DB connection
await app.state.repo.stop_db() await app.state.repo.stop_db()

View File

@@ -7,11 +7,13 @@ router = APIRouter()
@router.get("/metrics") @router.get("/metrics")
def view_metrics(request: Request) -> dict: def view_metrics(request: Request) -> dict:
"""View the metrics (number of requests and database calls processed)"""
return request.app.state.metrics return request.app.state.metrics
@router.post("/metrics") @router.post("/metrics")
def view_and_reset_metrics(request: Request) -> dict: def view_and_reset_metrics(request: Request) -> dict:
"""View and reset the metrics"""
metrics = request.app.state.metrics metrics = request.app.state.metrics
request.app.state.metrics = request.app.state.DEFAULT_METRICS.copy() request.app.state.metrics = request.app.state.DEFAULT_METRICS.copy()
return metrics return metrics
@@ -19,6 +21,22 @@ def view_and_reset_metrics(request: Request) -> dict:
@router.get("/standard/{item_id}") @router.get("/standard/{item_id}")
async def get_standard_route(request: Request, item_id: int) -> Item: async def get_standard_route(request: Request, item_id: int) -> Item:
"""Get an item by a specified id.
Requests to this route are not coalesced. Directly calls
the database repository to get items.
Args:
request (Request): Used to access the metrics object in app state.
item_id (int): The requested item id.
Raises:
HTTPException: When the requested item is not found.
Returns:
Item: Details of the requested item.
"""
request.app.state.metrics["requests"] += 1 request.app.state.metrics["requests"] += 1
item = await request.app.state.repo.get_by_id(item_id) item = await request.app.state.repo.get_by_id(item_id)
@@ -30,6 +48,27 @@ async def get_standard_route(request: Request, item_id: int) -> Item:
@router.get("/coalesced/{item_id}") @router.get("/coalesced/{item_id}")
async def get_coalesced_route(request: Request, item_id: int) -> Item: async def get_coalesced_route(request: Request, item_id: int) -> Item:
"""Get an item by a specified id.
This route will request a future from the coalescing
repository and await the resulting response of that future,
pending being processed by the task queue.
Requests to this route should reduce the overall number
of database calls being made (should requests be made
simultaneously).
Args:
request (Request): Used to access the metrics object in app state.
item_id (int): The requested item id.
Raises:
HTTPException: When the requested item is not found.
Returns:
Item: Details of the requested item.
"""
request.app.state.metrics["requests"] += 1 request.app.state.metrics["requests"] += 1
item_future = await request.app.state.coalescer.get_by_id(item_id) item_future = await request.app.state.coalescer.get_by_id(item_id)