You've already forked request-coalescing-py
mirror of
https://github.com/hexolan/request-coalescing-py.git
synced 2026-03-26 10:11:16 +00:00
initial commit
This commit is contained in:
0
request_coalescing_py/__init__.py
Normal file
0
request_coalescing_py/__init__.py
Normal file
33
request_coalescing_py/coalescer.py
Normal file
33
request_coalescing_py/coalescer.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from request_coalescing_py.models import Item
|
||||
from request_coalescing_py.database import DatabaseRepo
|
||||
|
||||
|
||||
class CoalescingRepo:
|
||||
def __init__(self, repo: DatabaseRepo):
|
||||
self._repo = repo
|
||||
|
||||
self._queue = asyncio.Queue()
|
||||
self._queued = {} # map of item_id: future
|
||||
|
||||
async def get_by_id(self, item_id: int) -> "asyncio.Future[Optional[Item]]":
|
||||
# Check if there is an already pending request for that item.
|
||||
fut = self._queued.get(item_id)
|
||||
if fut:
|
||||
return fut
|
||||
|
||||
# There is not a pending request.
|
||||
fut = asyncio.get_event_loop().create_future()
|
||||
self._queued[item_id] = fut
|
||||
await self._queue.put(item_id)
|
||||
return fut
|
||||
|
||||
async def process_queue(self) -> None:
|
||||
while True:
|
||||
item_id = await self._queue.get()
|
||||
item = await self._repo.get_by_id(item_id)
|
||||
self._queued[item_id].set_result(item)
|
||||
del self._queued[item_id]
|
||||
self._queue.task_done()
|
||||
37
request_coalescing_py/database.py
Normal file
37
request_coalescing_py/database.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from databases import Database
|
||||
|
||||
from request_coalescing_py.models import Item
|
||||
|
||||
|
||||
class DatabaseRepo:
|
||||
def __init__(self, app: FastAPI) -> None:
|
||||
self.app = app
|
||||
|
||||
async def start_db(self) -> None:
|
||||
self._db = Database("sqlite://./test.db")
|
||||
await self._db.connect()
|
||||
|
||||
try:
|
||||
await self._db.execute(query="CREATE TABLE IF NOT EXISTS 'items' (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
await self._db.execute(query="INSERT INTO 'items' (id, name) VALUES (1, 'Test Item')")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def stop_db(self) -> None:
|
||||
await self._db.disconnect()
|
||||
|
||||
async def get_by_id(self, item_id: int) -> Optional[Item]:
|
||||
self.app.state.metrics["db_calls"] += 1
|
||||
|
||||
# Simulate expensive read (50ms)
|
||||
await asyncio.sleep(.05)
|
||||
|
||||
row = await self._db.fetch_one(query="SELECT * FROM 'items' WHERE id = :id", values={"id": item_id})
|
||||
if row:
|
||||
return Item(**row._mapping)
|
||||
|
||||
return None
|
||||
33
request_coalescing_py/main.py
Normal file
33
request_coalescing_py/main.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import asyncio
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from request_coalescing_py.database import DatabaseRepo
|
||||
from request_coalescing_py.coalescer import CoalescingRepo
|
||||
from request_coalescing_py.routes import router
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
# initialise metrics
|
||||
app.state.DEFAULT_METRICS = {"requests": 0, "db_calls": 0}
|
||||
app.state.metrics = app.state.DEFAULT_METRICS.copy()
|
||||
|
||||
# initilise DB repository
|
||||
app.state.repo = DatabaseRepo(app=app)
|
||||
await app.state.repo.start_db()
|
||||
|
||||
# initialise worker and coalescing repo
|
||||
app.state.coalescer = CoalescingRepo(repo=app.state.repo)
|
||||
asyncio.create_task(app.state.coalescer.process_queue())
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
# close DB connection
|
||||
await app.state.repo.stop_db()
|
||||
|
||||
|
||||
app.include_router(router)
|
||||
6
request_coalescing_py/models.py
Normal file
6
request_coalescing_py/models.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
40
request_coalescing_py/routes.py
Normal file
40
request_coalescing_py/routes.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from request_coalescing_py.models import Item
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/metrics")
|
||||
def view_metrics(request: Request) -> dict:
|
||||
return request.app.state.metrics
|
||||
|
||||
|
||||
@router.post("/metrics")
|
||||
def view_and_reset_metrics(request: Request) -> dict:
|
||||
metrics = request.app.state.metrics
|
||||
request.app.state.metrics = request.app.state.DEFAULT_METRICS.copy()
|
||||
return metrics
|
||||
|
||||
|
||||
@router.get("/standard/{item_id}")
|
||||
async def get_standard_route(request: Request, item_id: int) -> Item:
|
||||
request.app.state.metrics["requests"] += 1
|
||||
|
||||
item = await request.app.state.repo.get_by_id(item_id)
|
||||
if item is None:
|
||||
raise HTTPException(status_code=404, detail="Item Not Found")
|
||||
|
||||
return item
|
||||
|
||||
|
||||
@router.get("/coalesced/{item_id}")
|
||||
async def get_coalesced_route(request: Request, item_id: int) -> Item:
|
||||
request.app.state.metrics["requests"] += 1
|
||||
|
||||
item_future = await request.app.state.coalescer.get_by_id(item_id)
|
||||
item = await item_future
|
||||
if item is None:
|
||||
raise HTTPException(status_code=404, detail="Item Not Found")
|
||||
|
||||
return item
|
||||
Reference in New Issue
Block a user