Asynchronous pipeline
The goal of the pipeline is to process the input data in several consequtive steps with minimal latency. In most cases, when handling the data in real time, some steps require idle periods of waiting for the data to come from the upstream sourses. SNAP uses python asyncio approach to make these waits asynchronous, so that one part of the pipeline can continue processing, while the other is waiting.
Each pipeline application is defined as Node, which consists of one or several chains, which is composed of Source and one or several steps processing the Data portion.
This section defines the basic terms, used in SNAP, and how to use them:
Node
A single python process, which consists of one or more Chain and runs all of them simulatneously (asynchronously).
It is defined in the configuration file as a mapping of node name and it’s chains:
node: !Node [:<node name>]
- !chain [:<chain name1>]
#chain1 configuration here
- !chain [:<chain name2>]
#chain2 configuration here
where <node name> is a name of this node, describing it’s purpose.
This name will be used, when running the snap program with snap_run --node <node name> option)
- Note:
Node names within one file must be different. Chain names within the node must be different.
Chain
Chain defines a single pipeline, getting the data from its Source and processing it in the Step one by one, and optionally forwarding it to one or several other chains (targets).
Chain is defined in the configuration file and contains a list of elements:
!chain [:<chain name>]
- !from <source element>
- <step element 1>
- <step element 2>
- !to [<chain_name1>, <chain_name2>]
- !from:
The element with
!fromkeyword defines a Source element: this is the input of this data. If the chain has no!fromelement, the chain is expected to receive data from another chain (if it’s listed in another chain’stosection).- !to:
Section with one or more other Chain names within the same node, where the output of the last step should be forwarded. If missing, then the data from last step is not forwarded.
Data portion
Pipeline processes data in portions. This portion can be any python object - a number, tuple, string, function, etc. Data is produced by the Source and processed in the Step.
Source
An asynchronous (or synchronous) generator producing Data portion
Simple example of a Source
#source from async generator
async def gen_timestamp(delay=1):
"""generate current timestamps with given delay"""
while True:
await asyncio.sleep(delay)
yield datetime.datetime.now()
In practical cases it can be yielding the data when it arrives in the file or via network.
Step
Step defines any data manipulation. Steps can be vaguely classified into Transformation, Filter and Buffer.
In the configuration file steps are provided as a list in the steps: section inside the Chain definition.
Transformation
It’s a Step that manipulates one data portion, and returns the result, which will be fed to the next step.
Can be just a function on the data, like this example
#function without parameters
def dump(d):
print(f'DUMP: {d}')
return d
and referenced in the configuration file as
steps:
- foo.bar.dump
But if the processing function needs configurable parameters, it should be defined as a functor, or a function of parameters, returning the function of the data, like in this example:
#function with parameters
def dump(prefix="DUMP"):
def _f(d):
print(f'{prefix} {d}')
return d
return _f
and described in the configuration with parameters:
steps:
- foo.bar.dump: {prefix: "Here's what I got:"}
- Note:
Arguments are passed to function/functor constructor as keyword args
Filter
Here the filter is a Step that receives all the data portions, but produces results only after some of them.
It can be defined as an asynchronous coroutine:
# filter example: corountine
async def positive(source):
""" yield positive values """
async for d in source:
if(d>val):
yield d
and described in the configuration as:
steps:
- foo.bar.positive
or as a function, producing coroutine, if the algorithm needs parameters:
# filter example with parameters
def threshold(val=0):
""" yield values above 'val' """
async def _f(source):
async for d in source:
if(d>val):
yield d
return _f
and described in the configuration as:
steps:
- foo.bar.threshold: {val: 1}
Buffer
“Buffer” is a Step which processes the data, but the input event loop is decoupled from the output loop. An example could be buffering data, and producing the accumulated data every 10 seconds.
A buffer object is defined as a python class, implementing async def put and async def get methods. Example:
class Buffer:
def __init__(self, buffer_time=10):
"""object to accumulate the data in the time bins"""
self.data = []
self.buffer_time = buffer_time
async def put(self, data):
self.data+=[data]
async def get(self):
#will yield the data array every approx every buffer_time
await asyncio.sleep(self.buffer_time)
res = self.data
self.data = []
return res
And in configuration file is defined as (if Buffer is inside foo.bar python module):
steps:
- foo.bar.Buffer: {buffer_time: 10}
- Note:
A buffer can also be used as a Source of a chain. In that case, if the data flows from another chain, it will be put in the buffer.