Event and Video Recording

When running pdserver along with pdcam, the websocket server provides a stream of messages which contain state updates and captured images. This can be captured using the pdrecord command line utility, however it may be more useful to control the capture of data from within your own script. For exampe, in many experiments there are brief periods of interest when things are being moved, followed by long times when nothing is happening. Because the captured images contain a lot of bytes, it is often better to record only the interesting parts for later examination.

Note

The recordings are raw protobuf messages, as defined in the purpledrop.protobuf.messages_pb2 module, which is generated from these message definitions.

The pdlog utility can extract an mpeg-4 video from a binary log. For example, if you have a log saved in log.bin:

pdlog --video log.mp4 log.bin will extract the video frames to log.mp4

# event_recorder.py
import asyncio
import threading
import time
import struct
import websockets

class EventRecorder(object):
    def __init__(self, uri):
        """An EventRecorder captures the event stream from the purpledrop API server to a file
        
        When creating an EventRecorder, you must provide the URI to the purpledrop websocket 
        server. Typically, this is run on port 7001 of the Raspberry PI (or whever pdserver is
        run). The URI should use the `ws` protocol, for example: `ws://localhost:7001` is a
        typical websocket URI. 

        To start recording, call `recorder.start('logfile.bin`)`. To stop, call `recorder.stop()`. 

        Recording is run in a background thread.
        """
        self.uri = uri
        self.thread = threading.Thread(target=self.__thread_entry, daemon=True)
        self.start_event = None
        self.output_file = None
        self.stop_event = None
        self.loop = None
        self.init_event = threading.Event()
        self.thread.start()

    def start(self, filepath):
        """Start recording to the provided file"""
        # Make sure wait_for_start task has started
        self.init_event.wait(timeout=1.0)
        if self.output_file is not None:
            raise RuntimeError("Already recording. Call stop() first.")
        self.output_file = filepath
        self.loop.call_soon_threadsafe(self.start_event.set)

    def stop(self):
        """Stop in progress recording"""
        self.loop.call_soon_threadsafe(self.stop_event.set)
        if not self.thread.is_alive():
            raise RuntimeError("EventRecorder thread has terminated.")
        # Wait for capture to finish
        while self.output_file is not None:
            time.sleep(0.1)

    async def __record(self, filepath):
        msg_count = 0
        with open(filepath, 'wb') as f:
            # With default close timeout, it takes a long time to close.
            async with websockets.connect(self.uri, close_timeout=0.2) as ws:
                while True:
                    recv_task = asyncio.create_task(ws.recv())
                    stop_task = asyncio.create_task(self.stop_event.wait())
                    done, pending = await asyncio.wait({recv_task, stop_task}, return_when=asyncio.FIRST_COMPLETED)
                    if stop_task in done:
                        recv_task.cancel()
                        return msg_count

                    msg_count += 1
                    raw_event = recv_task.result()
                    length_bytes = struct.pack('I', len(raw_event))
                    f.write(length_bytes)
                    f.write(raw_event)

    async def __wait_for_start(self):
        self.start_event = asyncio.Event()
        self.stop_event = asyncio.Event()
        self.loop = asyncio.get_event_loop()
        self.init_event.set()
        while True:
            await self.start_event.wait()
            self.start_event.clear()
            msg_count = await self.__record(self.output_file)
            self.output_file = None
            self.stop_event.clear()

    def __thread_entry(self):
        asyncio.run(self.__wait_for_start())

And here’s a quick example of using the event recorder to capture three separate logs at different times:

# event_recorder_demo.py
from event_recorder import EventRecorder
import time

PD_HOST = "mcbride-purpledrop"

rec = EventRecorder(f'ws://{PD_HOST}:7001')

# Record three segments
for i in range(3):
    print("Starting capture {i}")
    # Start the recorder and let it capture for 10 seconds
    rec.start(f'log{i}.bin')
    time.sleep(10.0)
    print ("Stopping Capture")
    rec.stop()
    # Capture is now stopped. Wait 5 seconds before moving on to the next
    time.sleep(5.0)