as a response to some situations in our systems, we want to send real-time notifications to users. Because of that, we are using an Ably.io library. But as long as the application grows we encounter a problem. Because our application starts working slowly, consume more resources which means it costs us more and the most important thing is that we see that some of the events were duplicated which causes a lot of errors in web and mobile application.
Based on that we decided to implement the mechanism, which responsibility would be to collect events, group them, eliminate duplicates and then after some interval send them to the end devices (mobile, web). This mechanism should be fully transparent to a client of the batcher (developer), he shouldn’t know at first how it works under the hood. The only thing we want from the developer is to decide which strategy he wants to choose when selecting events to send. So the only thing we want from him is a registration in an IoC container. Which looks like this in our codebase (Autofac):
Of course, he also needs to inject batcher in a class where he wants to use it and then on the injected service (batcher) invoke a
SendAsync method which implies sending a message(s). Which looks like this:
So going into details, implementation of a batcher looks like this:
We could see that we used here a TPL library. So going through the code of a Batcher. We used a BatchBlock, whose main responsibility is to trigger a function every
x milliseconds. This was achieved via
Timer. Because of this, we could collect events on our side and then do something with them thanks to an
IStrategy service and at the end send them in groups, where the size of this group is defined by us in a constructor. So we also have a guarantee that we wouldn’t be sent more than
n events at once which could be very helpful in terms of not exceeding quota per second/minute/day limit.
IStrategy which you could see here is a service reponsible for grouping/filtering events additionally if needed and it’s implementation looks like this;
Where Example element looks like this:
The only thing which should be done additionally by me is informing about the state of a Batcher. I have to do this because when I add the event to the queue at the beginning. Which then is passed to the next blocks I lost an information about errors. I started by implementing interceptors via
TransformBlock. Their task is to log information about the flow between blocks because as you may notice I have 3 blocks. Receiving, Storing and Triggering block. Going between each other is logged. Beyond of that, I decided that the whole mechanism responsible for sending finally a message to an external provider should be captured in a try/catch block because we don’t know what could happen in an external library. If we don’t do that this exception would be
catch by a block and we wouldn’t notice that something bad happened.
As you may see thanks to a TPL library we were able to implement a small mechanism which batches/stores data because of date and size which wasn’t provided by
Ably.IO (or we didn’t “google” enough) and which is available for example in
EventBus thanks to a partition key. But beyond that, the code still looks pretty easy and clear and could be used across the whole system.
The code could be found here: github
Thanks for reading!