Application Development
Unlike sequential processing of data, dranspose leverages parallel processing to achieve the throughput necesary for live processing.
We use the map reduce programming model for distributing work. Parallel workers unfortunately have to work independently. Therefore, they only have access to a single event at a time. They may store local state, but don't have access to arbitrary other events.
For data analysis which has to cross all events, there is a secondary reduce step which can only cope with reduced data, but gets all events delivered.
A lot of common analysis tasks are easily mapped to this programming model. The map phase perform the heavy lifting, e.g. analysing images and then forwards a spectrum to the reducer which only averages the sprectra or appends them to a list.
Using dranspose
Note
Before developing a new worker functionality, it is necessary to capture events coming from the streams. Please see capturing events. Here we assume that you have dumps for all necessary streams
To analyse data with dranspose, you need to split your task into a map
and a redude
function.
Create a new git repository and create the following structure
.
├── Dockerfile
├── README.md
├── requirements.txt
└── src
├── reducer.py
└── worker.py
worker.py
The worker gets events and has to first parse the messages from the data stream. Then it reduces the data available and forwards a condensed version.
The worker class is instantiated for every new trigger map. This allows to use the __init__
for resetting the state.
All calls provide the current set of parameters
which may be required to set the worker up.
Creating a logger is useful for development and production.
import logging
from dranspose.event import EventData
from dranspose.middlewares import contrast
from dranspose.middlewares import xspress
logger = logging.getLogger(__name__)
class FluorescenceWorker:
def __init__(self, parameters=None, **kwargs):
self.number = 0
process_event
function gets an EventData object which contains all required streams for the current event.
The first step should be to check that the required streams for the analyis are present.
def process_event(self, event: EventData, parameters=None):
logger.debug("using parameters %s", parameters)
if {"contrast", "xspress3"} - set(event.streams.keys()) != set():
logger.error(
"missing streams for this worker, only present %s", event.streams.keys()
)
return
dranspose
include middlewares to autmatically parse the frames to python objects.
try:
con = contrast.parse(event.streams["contrast"])
except Exception as e:
logger.error("failed to parse contrast %s", e.__repr__())
return
try:
spec = xspress.parse(event.streams["xspress3"])
except Exception as e:
logger.error("failed to parse xspress3 %s", e.__repr__())
return
logger.error("contrast: %s", con)
logger.error("spectrum: %s", spec)
starting
or finishing
if con.status == "running":
# new data
sx, sy = con.pseudo["x"][0], con.pseudo["y"][0]
logger.error("process position %s %s", sx, sy)
roi1 = spec.data[3][parameters["roi1"][0] : parameters["roi1"][1]].sum()
return {"position": (sx, sy), "concentations": {"roi1": roi1}}
The returned object will be available to the reducer
reducer.py
The reducer may also have a setup in __init__
where is may initialise the special publish
attribute.
This attribute is automatically exposed via http for live viewers
from dranspose.event import ResultData
class FluorescenceReducer:
def __init__(self, parameters=None, **kwargs):
self.number = 0
self.publish = {"map": {}}
process_result
, only simple operations are possible, such as appending to a dictionary, as this has to run at the acquisition speed.
def process_result(self, result: ResultData, parameters=None):
print(result)
if result.payload:
self.publish["map"][result.payload["position"]] = result.payload[
"concentations"
]
Developing an analysis
The worker and reducer is easily tested by the dranspose replay
command.
Provide the worker class -w
and the reducer class -r
.
You also need to provide the dumped stream from each ingester.
If you need parameters, you can provide a json or pickle file which will be provided to your worker functions.