Archive for the ‘python’ Category

Python gstreamer controller

Tuesday, January 4th, 2011 by lane

I needed a GstController to control the alpha channel on a video crossfade. The gstreamer documentation makes it look easy, but it actually tripped me up for quite some time. I am using the python bindings, and they have some subtlies that took me some time to figure out.

First, I got the general usage out of the Pitivi source code. To create a controller, you do the following:

        controller = gst.Controller(alpha, "alpha")
        controller.set_interpolation_mode("alpha", gst.INTERPOLATE_LINEAR)
        controller.set("alpha", 0, 0.0)
        controller.set("alpha", 5*gst.SECOND, 1.0)

In the above example, alpha is the element to be controlled. This causes the alpha property on the alpha element to linearly change from 0.0 to 1.0 over the course of 5 seconds.

After adding the above code snippet to my application, however, it did not work. To debug it, I ended up checking out the gstreamer source code and after tracing things through using printf()’s, I finally discovered that my controller was getting attached to the alpha as I wanted but then it was getting detached almost immediately thereafter. That made me realize that in python I needed to hold a reference to the controller for the duration of the stream. I was letting the controller variable in the above example go out of scope, at which point it had no more references and therefore got deleted and removed from the alpha element. So the solution was to hold a reference to it as shown below in the example of a video crossfade:

import os, time
import gobject
import pygst
pygst.require("0.10")
import gst

class Main:
    def __init__(self):
        src1 = gst.element_factory_make("filesrc")
        src2 = gst.element_factory_make("filesrc")

        src1.set_property("location", "/home/lane/test1.mp4")
        src2.set_property("location", "/home/lane/test2.mp4")

        dec1 = gst.element_factory_make("decodebin2")
        dec2 = gst.element_factory_make("decodebin2")

        alpha1 = gst.element_factory_make("alpha")
        alpha2 = gst.element_factory_make("alpha")

        self.controller = gst.Controller(alpha2, "alpha")
        self.controller.set_interpolation_mode("alpha", gst.INTERPOLATE_LINEAR)
        self.controller.set("alpha", 0, 0.0)
        self.controller.set("alpha", 5*gst.SECOND, 1.0)

        mixer = gst.element_factory_make("videomixer")

        queue = gst.element_factory_make("queue")
        color = gst.element_factory_make("ffmpegcolorspace")
        sink  = gst.element_factory_make("autovideosink")

        pipeline = gst.Pipeline("pipeline")
        pipeline.add(src1, src2, dec1, dec2, alpha1, alpha2, mixer, queue, color, sink)

        def on_pad(comp, pad, data, element):
            sinkpad = element.get_compatible_pad(pad, pad.get_caps())
            pad.link(sinkpad)

        dec1.connect("new-decoded-pad", on_pad, alpha1)
        dec2.connect("new-decoded-pad", on_pad, alpha2)
        src1.link(dec1)
        src2.link(dec2)
        alpha1.link(mixer)
        alpha2.link(mixer)
        mixer.link(queue)
        queue.link(color)
        color.link(sink)

        self.pipeline = pipeline

    def start(self):
        self.running = True
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.enable_sync_message_emission()
        bus.connect("message", self.on_message)
        self.pipeline.set_state(gst.STATE_PLAYING)

    def on_exit(self, *args):
        self.pipeline.set_state(gst.STATE_NULL)
        self.running = False

    def on_message(self, bus, message):
        t = message.type
	if t == gst.MESSAGE_EOS:
            self.on_exit()
	elif t == gst.MESSAGE_ERROR:
            err, debug = message.parse_error()
	    print "Error: %s" % err, debug
            self.on_exit()

loop = gobject.MainLoop()
gobject.threads_init()
context = loop.get_context()

m = Main()
m.start()

while m.running:
    context.iteration(True)

Some things to note in the above code:

  • In order to prevent the controller from going out of scope and getting deleted, I added it as an attribute to the Main object (i.e. self.controller).
  • I am manually iterating the mainloop. This is so that I do not have to create any gtk stuff. I recently discovered this technique and I use it in command line tools that do not require X11 or a $DISPLAY environment variable set.

Gstreamer Video Crossfade Example

Tuesday, January 4th, 2011 by lane

It took me an amazingly long amount of time to figure this one out, but here is a simplified python script to perform a video crossfade in gstreamer using the gnonlin package.


import os, time
import gobject
import pygst
pygst.require("0.10")
import gst

gst.MILLISECOND = gst.SECOND / 1000

def get_crossfade(duration):
    # To crossfade, we add an alpha channel to both streams. Then a video
    # mixer mixes them according to the alpha channel. We put a control
    # on the alpha channel to linearly sweep it over the duration of the
    # crossfade. The returned bin should get placed in a gnloperation.
    # The reason to put the alpha and final ffmpegcolorspace conversion
    # in this bin is that are only applied during the crossfade and not
    # all the time (saves some processing time).
    bin = gst.Bin()
    alpha1 = gst.element_factory_make("alpha")
    alpha2 = gst.element_factory_make("alpha")
    mixer  = gst.element_factory_make("videomixer")
    color  = gst.element_factory_make("ffmpegcolorspace")

    bin.add(alpha1, alpha2, mixer, color)
    alpha1.link(mixer)
    alpha2.link(mixer)
    mixer.link(color)

    controller = gst.Controller(alpha2, "alpha")
    controller.set_interpolation_mode("alpha", gst.INTERPOLATE_LINEAR)
    controller.set("alpha", 0, 0.0)
    controller.set("alpha", duration * gst.MILLISECOND, 1.0)

    bin.add_pad(gst.GhostPad("sink1", alpha1.get_pad("sink")))
    bin.add_pad(gst.GhostPad("sink2", alpha2.get_pad("sink")))
    bin.add_pad(gst.GhostPad("src",   color.get_pad("src")))

    return bin, controller # return the controller otherwise it will go out of scope and get deleted before it is even applied

class Main:
    def __init__(self):
        dur1 = 5000 # duration (in ms) to play of first clip
        dur2 = 5000 # duration (in ms) to play of second clip
        dur_crossfade = 500 # number of milliseconds to crossfade for

        # we play two clips serially with a crossfade between them
        # using the gnonlin gnlcomposition element.
        comp = gst.element_factory_make("gnlcomposition")

        # setup first clip
        src1 = gst.element_factory_make("gnlfilesource")
        comp.add(src1)
        src1.set_property("location", "/home/lane/work/sshow/src/test1.mp4")
        src1.set_property("start",          0    * gst.MILLISECOND)
        src1.set_property("duration",       dur1 * gst.MILLISECOND)
        src1.set_property("media-start",    0    * gst.MILLISECOND)
        src1.set_property("media-duration", dur1 * gst.MILLISECOND)
        src1.set_property("priority",       1)

        # setup second clip
        src2 = gst.element_factory_make("gnlfilesource")
        comp.add(src2)
        src2.set_property("location", "/home/lane/work/sshow/src/test2.mp4")
        src2.set_property("start",  (dur1-dur_crossfade) * gst.MILLISECOND)
        src2.set_property("duration",       dur2 * gst.MILLISECOND)
        src2.set_property("media-start",    0    * gst.MILLISECOND)
        src2.set_property("media-duration", dur2 * gst.MILLISECOND)
        src2.set_property("priority",       2)

        # setup the crossfade
        op = gst.element_factory_make("gnloperation")
        fade, self.controller = get_crossfade(dur_crossfade)
        op.add(fade)
        op.set_property("start",   (dur1-dur_crossfade) * gst.MILLISECOND)
        op.set_property("duration",       dur_crossfade * gst.MILLISECOND)
        op.set_property("media-start",    0             * gst.MILLISECOND)
        op.set_property("media-duration", dur_crossfade * gst.MILLISECOND)
        op.set_property("priority",       0)
        comp.add(op)

        # setup the backend viewer
        queue = gst.element_factory_make("queue")
        sink  = gst.element_factory_make("autovideosink")

        pipeline = gst.Pipeline("pipeline")
        pipeline.add(comp, queue, sink)

        def on_pad(comp, pad, element):
            sinkpad = element.get_compatible_pad(pad, pad.get_caps())
            pad.link(sinkpad)

        comp.connect("pad-added", on_pad, queue)
        queue.link(sink)

        self.pipeline = pipeline

    def start(self):
        self.running = True
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.enable_sync_message_emission()
        bus.connect("message", self.on_message)
        self.pipeline.set_state(gst.STATE_PLAYING)

    def on_exit(self, *args):
        self.pipeline.set_state(gst.STATE_NULL)
        self.running = False

    def on_message(self, bus, message):
        t = message.type
	if t == gst.MESSAGE_EOS:
            self.pipeline.set_state(gst.STATE_NULL)
            self.on_exit()
	elif t == gst.MESSAGE_ERROR:
            err, debug = message.parse_error()
	    print "Error: %s" % err, debug
            self.on_exit()

loop = gobject.MainLoop()
gobject.threads_init()
context = loop.get_context()

m = Main()
m.start()

while m.running:
    context.iteration(True)

I struggled to get the smpte transitions working using the smpte plugin element and never got it. Using the smptealpha plugin element, however, I was able to get transitions working using the same approach as the crossfade example above. All I had to do was create a new function:


def get_smpte(duration, transition=1):
    # To crossfade, we add an alpha channel to both streams. Then a video
    # mixer mixes them according to the alpha channel. We put a control
    # on the alpha channel to linearly sweep it over the duration of the
    # crossfade. The returned bin should get placed in a gnloperation.
    # The reason to put the alpha and final ffmpegcolorspace conversion
    # in this bin is that are only applied during the crossfade and not
    # all the time (saves some processing time).
    bin = gst.Bin()
    alpha1 = gst.element_factory_make("alpha")
    smpte  = gst.element_factory_make("smptealpha")
    mixer  = gst.element_factory_make("videomixer")
    color  = gst.element_factory_make("ffmpegcolorspace")

    bin.add(alpha1, smpte, mixer, color)
    alpha1.link(mixer)
    smpte.link(mixer)
    mixer.link(color)

    smpte.set_property("type", transition)

    controller = gst.Controller(smpte, "position")
    controller.set_interpolation_mode("position", gst.INTERPOLATE_LINEAR)
    controller.set("position", 0, 1.0)
    controller.set("position", duration * gst.MILLISECOND, 0.0)

    bin.add_pad(gst.GhostPad("sink1", alpha1.get_pad("sink")))
    bin.add_pad(gst.GhostPad("sink2", smpte.get_pad("sink")))
    bin.add_pad(gst.GhostPad("src",   color.get_pad("src")))

    return bin, controller

Now I can call this function get_smpte() place of the get_crossfade() function and get an smpte transition instead. Works great so far.

Dynamic method creation in python

Thursday, June 26th, 2008 by lane

I recently wrote some python bindings for a third party, closed source shared library. This library was a USB communication API to a board made by Opal Kelly. I was writing a multi threaded python application for video streaming. The video stream went through the shared library on a worker thread. The GUI thread, however, also had access to the shared library giving the user the ability to change settings while the thread was running. The shared library was not thread safe, however, the global interpreter lock kept things safe. I found the global interpreter lock when trying to figure out why the video stream through this shared library was so slow and choppy. The reason was that the streaming thread would not context switch while waiting on I/O. After adding the Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS macros around my I/O calls, my video stream started behaving nicely, but now I had a thread safety issue because the GUI thread could now access the shared library while the capture thread was waiting on I/O.

To solve this, I wanted to wrap the python bindings in a class that would use a mutex to lock access to every function in the shared library. I did not want to have individually wrap each function call either. So I used the __getattribute__ function to globally wrap each function like this:

class FrontPanelThreadSafe(ok.FrontPanel):
    """This class wraps up the ok.FrontPanel to make it thread safe by
    creating a lock that must be acquired before any function call in
    the base class can be called."""

    def __init__(self):
        ok.FrontPanel.__init__(self)
        self.lock = threading.Lock()

    def __getattribute__(self, key):
        """This function does all the magic.  All function calls to
        this object go through this method.  This method passes them
        on to the base class after it acquires the lock to make the
        access to the opal kelly board atomic."""
        if(key == "lock"):
            return object.__getattribute__(self, key)
        else:
            def func(*args, **kw):
                try:
                    self.lock.acquire()
                    return ok.FrontPanel.__getattribute__(self, key)(*args, **kw)
                finally:
                    self.lock.release()
            return func

Fortunately, right now the base class does not have any attributes–just methods. If it had attributes, then I would need to add a condition to distinguish the two.

linux gpib on Fedora 5 with swig’d python bindings - National Instraments USB HS card

Sunday, April 6th, 2008 by lane
  • Download linux-gpib version 3.2.10 from http://linux-gpib.sourceforge.net
  • Build the linux-gpib software
    ./configure --disable-guile-binding --disable-perl-binding --disable-php-binding  --disable-python-binding --disable-tcl-binding --prefix=/usr
    make
    sudo make install
  • Created a file /etc/udev/rules.d/60-gpib.rules with the following contents
    BUS=="usb", SYSFS{idVendor}=="3923", MODE="0666", RUN+="/usr/sbin/gpib_config"
    KERNEL="gpib[0-9]*", MODE="0666"

    I found that the kernel was oops’ing with these udev lines, so I commented out the first line that runs gpib_config and then manually run gpib_config. My guess is that the device needs some time to come on-line and that a delay is necessary. I am not sure how to force udev to wait though.

  • Reload the udev rules
    sudo udevcontrol reload_rules
  • Create /etc/gpib.conf as follows:
    interface {
    	minor = 0
    	board_type = "ni_usb_b"
    	name = "usb"
    	pad = 0
    	sad = 0
    	timeout = T3s
    	master = yes
    }
  • From the manual

    Unlike the USB-B, the USB-HS does not require a firmware upload to become functional after being
    plugged in. The linux-gpib tarball contains hotplug scripts which will automatically run gpib_config
    after the device is plugged in.

  • Unpack this tar file in the languages subdirectory swig’d linux gpib.
  • cd into the py_swig directory. run make and make install
  • Now you can import gpib in python
  • Here is some examples of how to use it
    import gpib
    
    class hp34401A:
        def __init__(self, addr, minor=0, timeout=11):
            self.ud = gpib.ibdev(minor, addr, 0, timeout, 1, 0)
            if(self.ud < 0):
                raise RuntimeError("Error creating hp8656B")
    
        def meas(self):
            gpib.ibWrt(self.ud, "RST*");
            gpib.ibWrt(self.ud, "MEAS:VOLT:DC?");
            status, data = gpib.ibRd(self.ud);
            if(gpib.ThreadIberr()):
                raise Exception("Read error: status = 0x%x" % status)
            else:
                print data[:gpib.ThreadIbcnt()]
                return float(data[:gpib.ThreadIbcnt()])
    
    class hp8656B:
        def __init__(self, addr, minor=0, timeout=11):
            self.ud = gpib.ibdev(minor, addr, 0, timeout, 1, 0)
            if(self.ud < 0):
                raise RuntimeError("Error creating hp8656B")
    
        def setFreq(self, freq):
    
            # 9 digits max
            gpib.ibWrt(self.ud, "FR"+str(freq)+"MZ")
    
        def setAmpl(self, ampl):
            # 9 digits max
            gpib.ibWrt(self.ud, "AP"+str(ampl)+"VL")
    
    class hp8644B:
        def __init__(self, addr, minor=0, timeout=11):
            self.ud = gpib.ibdev(minor, addr, 0, timeout, 1, 0)
            if(self.ud < 0):
                raise RuntimeError("Error creating hp8656B")
    
        def setFreq(self, freq):
    
            # 9 digits max
            gpib.ibWrt(self.ud, "FR"+str(freq)+"MZ")
    
        def setAmpl(self, ampl):
            # 9 digits max
            gpib.ibWrt(self.ud, "AP"+str(ampl)+"VL")
    
    if __name__ == "__main__":
    
        hp = hp34401A(12)

Plotting on left and right axis using matplotlib and numpy

Wednesday, March 19th, 2008 by lane

Example of Plotting on Left and Right axis
Plotting two lines on independent y-axis is useful when trying to overlay data that is on dramatically different scales. The following example (left_right.py) shows how to do this with matplotlib and numpy in python. I am not sure how to force the y-ticks on the left and right to automatically line up. If anyone know, please leave a comment with how to do it.

from pylab import *
from numpy import *

N = 100

# Create some random data for plotting on left axis and for the right axis
yLeft  = random.randn(N) + linspace(-25,25,N)
yRight = linspace(-50,50,N)**2

# Create left axis plots
axL = subplot(1,1,1)
plot(yLeft, '.-b')
grid(True)
ylabel("Left Y-Axis Data")
xlabel("X-Axis Data")
title("Data Plotted on Left and Right Axis")

# Create right axis and plots.  It is the frameon=False that makes this
# plot transparent so that you can see the left axis plot that will be
# underneath it.  The sharex option causes them to share the x axis.
axR = subplot(1,1,1, sharex=axL, frameon=False)
axR.yaxis.tick_right()
axR.yaxis.set_label_position("right")
plot(yRight, '.-g')
ylabel("Right Y-Axis Data")

# save the figure
savefig("left_right.png", dpi=72)