Apache Kafka 已成為大多數技術棧中的主流組件。使用 Kafka 的好處包括確保事件中的因果順序,同時保持并行性,通過在服務器之間快速復制分區來恢復故障,等等。
然而,運行 Kafka 也面臨著一系列挑戰。雖然許多工程團隊都希望將 Kafka 添加到他們的堆棧中并與“真正的”工程師一起贏得一席之地,但運營開銷構成了強大的進入障礙。
在這篇文章中,我們將重點介紹如何構建一個看起來像傳統單體應用程序但又是松散耦合的事件驅動系統的系統。為此,我們依賴于從領域驅動設計、事件溯源和一致性哈希等概念中學習。
有序事件
大多數系統關心事件的順序。大多數系統中的排序僅限于所考慮的域。例如,當我們查看帖子到一個線程時,我們關心的是相對于帖子的排序。當我們查看金融系統時,排序主要限于賬戶。大型系統中事件的全局排序很少有用,但可能是相關的。
場景:帖子被添加到一個線程Thread中
假設我們對每個添加的帖子都有相當多的后期處理,這反過來會更新線程的某些屬性。
這創建了一個相當好的場景來說明分區的使用。
在這種情況下,默認方法是將所有帖子發送到隊列中,并讓一群工作人員(或消費者)完成工作。這為我們提供了系統所需的并行性,但在我們與多個消費者打交道的那一刻,順序就會丟失。
我們保留順序的唯一方法是確保我們一次處理一個任務,從而才能反映該線程上發生的事情的真實順序。
下一個明顯的想法是使用每個線程的專用隊列來處理相同的問題,但如果我們知道我們將生成大量線程,那立即感覺像是矯枉過正。
分區
分區只是將我們的排隊系統分解為專門的分區。因此,如果我們從一個天真的估計開始,即8個工人每分鐘能夠處理1600個事件,那么我們的設計就從16個分區開始。
你可能需要做更多的工作來確保你的估計是好的,但在這個例子中,我們將以假設它是好的來工作。我們還為一個分區分配一個worker,因為我們希望每個分區都能始終保持因果排序。
現在我們需要確保一個特定線程的帖子都被路由到同一個分區。每個分區都由一個消費者管理,所以我們的排序不會被打亂。
重要的是要記住,"隊列分區 "或 "專用分區 "是一個抽象的結構。它實際上只是一個隊列。我們使用分區這個術語,因為它使我們很容易與該領域廣泛使用的術語保持一致。
一致性哈希
我們將使用一致哈希散列作為一種手段,將屬于特定線程的所有帖子路由到同一隊列分區(或隊列)。
在我們的例子中,我們將使用Murmurhash和一個由名為uHashRing的庫管理這個持續體。
將我們的隊列視為一個連續體
現在,如果我們簡單地將所有的8個隊列放在一個圓圈中,我們會得到這樣的結果。讓我們把這個稱為連續體,因為第7個隊列后面是第一個隊列,即第0個隊列。
現在,一致性散列允許我們使用threadId將一個給定的任務/工作映射到一個特定的隊列。因此,在這種情況下,我們使用threadId作為分區的關鍵。
這里需要注意的一個重要方面是,我們沒有把我們的隊列稱為后處理隊列。它們不是專用隊列。你可以把一個Transaction事務事件扔到這里,并期望相應的消費者(和事件處理程序)來處理它。
事件
在前面的幾段話中,我們已經說了很多關于事件的內容,但我們還沒有真正定義事件的含義。
我們的系統會把事件看成是發生在我們系統中的事實。
事實通常是指以某種方式改變了系統狀態的事情(或者是失敗的事情)。
例如,PostCreatedEvent發生在一個新帖子被創建時。同樣地,當帖子被更新時,PostUpdatedEvent也會發生。
你可以將一個事件映射到你系統中的大多數CRUD操作。
如果將你的系統設計成領域,你會驚訝地發現一個應用服務所觸發的事件的數量。
一個事件也映射了系統的周圍狀態。
讓我們設計一個創建帖子的應用服務:
from typing import List
from .services.base import ServiceBase
from sqlalchemy.session import Session
class PostService(ServiceBase)
def __init__(self, thread_id: UUID, params: PostCreateAPIParams, db_session: Session):
self.thread_id = thread_id
self.params = params
self.user: Union[User, None] = None
self.post: Union[Post, None] = None
self.db_session = db_session
self.errors: List[str] = []
self.error_code: Union[str, None] = None
async def __call__(self)
return await self.invoke()
async def invoke(self):
await self.find_thread()
await self.verify_author()
await self.create_post()
await self.build_response_dao()
await self.trigger_events()
return self
async def find_thread(self):
# truncated for brevity
pass
async def trigger_events(self):
user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else None
post_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else None
thread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else None
if await self.has_errors:
event_dao = PostCreatedEventDAO(
user=user_dao,
thread=thread_dao,
post=post_dao,
params=self.params,
errors=self.errors,
error_code= self.error_code
)
else:
event_dao = PostCreationFailedEventDAO(
user=user_dao,
thread=thread_dao,
params=self.params,
post=post_dao
errors=self.errors,
error_code= self.error_code
)
partition_key = (
str(self.thread.id) if self.thread else "PostCreationFailedEvent"
)
await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session)
return self
|
在這個例子中, trigger_events方法決定了要發布的事實。在這種情況下,它收集了周圍的上下文。這也可能包括請求參數(如傳遞到事件中的params屬性)。然而,什么是正確捕獲的上下文也取決于上下文:)。
因此,我們的最終事件可能看起來像這樣。請注意,該事件沒有一個 updated_at 屬性,因為我們認為事件是不可改變的事實。我們不能撤消已經發生的事情。
{
"event_name": "PostCreatedEvent",
"event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb",
"event_version": "v1.0",
"time": "2022-09-03T04:16:59.294509+00:00",
"payload": {
"user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........},
"post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... },
"thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....},
"params": { .... },
"errors": [ ... ],
"error_code": ""
},
"created_at": "2022-09-03T04:16:59.294"
"logged_at": "2022-09-03T04:16:59.294"
}
|
在我們的例子中,應用服務通過調用觸發方法將事件轉給一個叫做SystemEventsService的服務。 該方法在為我們實際發布該事件之前做了一系列的工作。它通過我們先前看到的連續體運行,根據我們傳遞給它的分區鍵識別隊列(和相應的工作者worker)。 這幾乎就是我們需要一致的散列的原因。這可以確保我們的事件總是由同一個分區(和工作者)處理。
因此,一旦我們為我們的任務確定了工作者,我們就要求工作者
- 保留該事件,以備我們以后需要再來處理它
- 將其發布給所有相關的工作者
- 讓我們訂閱該任務的事件驅動型工作負載觸發其工作流程。
將事件分配到正確的分區
@staticmethod
async def trigger(
partition_key: str, event_dao: SystemEventDAO
):
try:
worker: SystemEventPartitionConfig = await SystemEventsService._get_worker(
partition_key=partition_key
)
worker_func = getattr(system_events_workers, worker.worker_name)
log_info(msg=f"Trigger called with worker: {worker.worker_name}")
worker_func.delay(event_dao=event_dao.json())
except (OperationalError, ConnectionError) as e:
log_error(msg=f"[redisError] {e}", e=e, method="trigger", loc=f"{__name__}")
except Exception as e:
log_error(
msg=f"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}"
)
return
@staticmethod
async def _get_worker(partition_key: str) -> SystemEventPartitionConfig:
"""For a given string it returns the worker that should process the event by running it through a murmurr hashing
function and uses that to fetch the nodes from the continuum"""
node = ring.get(key=partition_key)
nodename = node.get("nodename", None)
if not nodename:
raise ValueError("Could not find a node in the continuum for key {node}")
node_config = continuum.get(nodename, None)
if not node_config:
raise ValueError(
"Could not find a node in the continuum for key {nodename}"
)
config_attrs = {"partition_key": partition_key, "partition_id": nodename}
config_attrs = {**node_config, **config_attrs}
return SystemEventPartitionConfig(**config_attrs)
|
事件驅動的系統
下面是最好的部分:
現在整個系統可以讓你把你的應用程序作為一系列的異步事件處理程序來運行,這些處理程序可以在特定的事件上被調用。當一個事件到達正確的分區時,工作者會將該事件分配給一系列的事件處理程序。
async def create_system_event(
task_type, event_dao: SystemEventDAO, db_session: Session = None
):
if not db_session:
db_session = get_session()
system_event: Union[SystemEvent, None] = None
try:
if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS:
system_event = await SystemEventsService.create(
event_dao=event_dao, db_session=db_session
)
if system_event:
log_info(
msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}"
)
event_dao.id = system_event.id
else:
log_info(
msg=f"system_generated_request_event with name: {event_dao.event_name} ready for processing."
)
await EventHandler.process(event_dao=event_dao, db_session=db_session)
except Exception as e:
log_error(
msg=f"Error handling events: {event_dao.event_name}: {e} \n {traceback.print_exc()}"
)
capture_exc(error=e)
finally:
db_session.close()
return system_event
|
分區工作者持久化該事件,并將其分派給EventHandler。 事件處理程序是一系列可獨立部署的函數,可以做任何你想做的事情。
如:
handlers = [
PostInsightsGeneratorEventHandler,
ThreadActivityManagerEventHandler,
SpamDetectionEventHandler,
#...,
#....,
ImageResizerEventHandler,
]
|
而我們的處理器可以以任何你喜歡的方式處理它們。在這里,我們按順序處理它們,但它也可以并行派發:
class EventHandler:
@staticmethod
async def process(event_dao: SystemEventDAO, db_session: Session = None):
log_info(msg=f'Event: {event_dao.event_name} arrived')
for event_handler in handlers:
await event_handler.process(event_dao=event_dao, db_session=db_session)
return
|
處理程序本身是一個相當簡單的類,它檢查相關的事件。
class PostInsightsGeneratorEventHandler(event_dao: SystemEventDAO, db_session: Session):
if not event_dao.event_name == "PostCreatedEvent" :
return
log_info(msg=f"PostCreatedEventHandler called with {event_dao.event_id}")
# Do whatever you need to here |