This week, I came across a fun Python project named psutil on Google Code. It says it works on Linux, Windows, OSX and FreeBSD. What it does is grab all the running processes and gives you information on them and also gives you the ability to terminate them. So I thought it would be fun to put a GUI on top of it and create my own Task Manager / Process Monitor application with wxPython. If you have a moment, you can come along for the journey as I take you through 4 iterations of my code.

The First Prototype

My first version just shows what’s currently running at the time the application itself is run and uses a wx.Timer to update every 5 seconds. I used the ObjectListView widget to display the data, which isn’t actually included in wxPython, so you’ll need to go grab that if you want to run the code.

import psutil
import wx
from ObjectListView import ObjectListView, ColumnDefn
 
########################################################################
class Process(object):
    """ """
 
    #----------------------------------------------------------------------
    def __init__(self, name, pid, exe, user, cpu, mem, desc=None):
        """Constructor"""
        self.name = name
        self.pid = pid
        self.exe = exe
        self.user = user
        self.cpu = cpu
        self.mem = mem
        #self.desc = desc
 
########################################################################
class MainPanel(wx.Panel):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self, parent):
        """Constructor"""
        wx.Panel.__init__(self, parent=parent)
        self.procs = []
 
        self.procmonOlv = ObjectListView(self, style=wx.LC_REPORT|wx.SUNKEN_BORDER)
        self.setProcs()
 
        mainSizer = wx.BoxSizer(wx.VERTICAL)
        mainSizer.Add(self.procmonOlv, 1, wx.EXPAND|wx.ALL, 5)
        self.SetSizer(mainSizer)
        self.updateDisplay()
 
        # check for updates every 5 seconds
        self.timer = wx.Timer(self)
        self.Bind(wx.EVT_TIMER, self.update, self.timer)
        self.timer.Start(5000)
 
    #----------------------------------------------------------------------
    def setProcs(self):
        """"""
        cols = [
            ColumnDefn("name", "left", 150, "name"),
            ColumnDefn("pid", "left", 50, "pid"),
            ColumnDefn("exe location", "left", 100, "exe"),
            ColumnDefn("username", "left", 75, "user"),
            ColumnDefn("cpu", "left", 75, "cpu"),
            ColumnDefn("mem", "left", 75, "mem"),
            #ColumnDefn("description", "left", 200, "desc")
            ]
        self.procmonOlv.SetColumns(cols)
        self.procmonOlv.SetObjects(self.procs)
 
    #----------------------------------------------------------------------
    def update(self, event):
        """"""
        self.updateDisplay()
 
    #----------------------------------------------------------------------
    def updateDisplay(self):
        """"""
        pids = psutil.get_pid_list()
        for pid in pids:
 
            try:
                p = psutil.Process(pid)
                new_proc = Process(p.name,
                                   str(p.pid),
                                   p.exe,
                                   p.username,
                                   str(p.get_cpu_percent()),
                                   str(p.get_memory_percent())
                                   )
                self.procs.append(new_proc)
            except:
                pass
 
        self.setProcs()
 
########################################################################
class MainFrame(wx.Frame):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        wx.Frame.__init__(self, None, title="PyProcMon")
        panel = MainPanel(self)
        self.Show()
 
if __name__ == "__main__":
    app = wx.App(False)
    frame = MainFrame()
    app.MainLoop()

There’s a fairly obvious issue with this implementation. Can you tell what it is? Well the process for getting the process information is kind of slow so the GUI pauses every 5 seconds. That’s annoying! So let’s add threading to fix that issue.

Adding Threading to the Application for Alpha 2

In this second version, we add threading and pubsub to make passing information from the thread to the GUI easier. Note that we also need to use wx.CallAfter to call pubsub as pubsub is not thread-safe.

import psutil # http://code.google.com/p/psutil/
import wx
 
from ObjectListView import ObjectListView, ColumnDefn
from threading import Thread
from wx.lib.pubsub import Publisher
 
########################################################################
class ProcThread(Thread):
    """
    Gets all the process information we need as psutil isn't very fast
    """
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        Thread.__init__(self)
        self.start() 
 
    #----------------------------------------------------------------------
    def run(self):
        """"""
        pids = psutil.get_pid_list()
        procs = []
        for pid in pids:
            try:
                p = psutil.Process(pid)
                new_proc = Process(p.name,
                                   str(p.pid),
                                   p.exe,
                                   p.username,
                                   str(p.get_cpu_percent()),
                                   str(p.get_memory_percent())
                                   )
                procs.append(new_proc)
            except:
                print "Error getting pid #%s information" % pid
 
        # send pids to GUI
        wx.CallAfter(Publisher().sendMessage, "update", procs)
 
########################################################################
class Process(object):
    """
    Definition of Process model for ObjectListView
    """
 
    #----------------------------------------------------------------------
    def __init__(self, name, pid, exe, user, cpu, mem, desc=None):
        """Constructor"""
        self.name = name
        self.pid = pid
        self.exe = exe
        self.user = user
        self.cpu = cpu
        self.mem = mem
        #self.desc = desc
 
########################################################################
class MainPanel(wx.Panel):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self, parent):
        """Constructor"""
        wx.Panel.__init__(self, parent=parent)
        self.procs = []
 
        self.procmonOlv = ObjectListView(self, style=wx.LC_REPORT|wx.SUNKEN_BORDER)
        self.setProcs()
 
        mainSizer = wx.BoxSizer(wx.VERTICAL)
        mainSizer.Add(self.procmonOlv, 1, wx.EXPAND|wx.ALL, 5)
        self.SetSizer(mainSizer)
 
        # check for updates every 5 seconds
        self.timer = wx.Timer(self)
        self.Bind(wx.EVT_TIMER, self.update, self.timer)
        self.timer.Start(15000)
        self.setProcs()
 
        # create a pubsub receiver
        Publisher().subscribe(self.updateDisplay, "update")
 
    #----------------------------------------------------------------------
    def setProcs(self):
        """"""
        cols = [
            ColumnDefn("name", "left", 150, "name"),
            ColumnDefn("pid", "left", 50, "pid"),
            ColumnDefn("exe location", "left", 100, "exe"),
            ColumnDefn("username", "left", 75, "user"),
            ColumnDefn("cpu", "left", 75, "cpu"),
            ColumnDefn("mem", "left", 75, "mem"),
            #ColumnDefn("description", "left", 200, "desc")
            ]
        self.procmonOlv.SetColumns(cols)
        self.procmonOlv.SetObjects(self.procs)
        self.procmonOlv.sortAscending = True
 
    #----------------------------------------------------------------------
    def update(self, event):
        """
        Start a thread to get the pid information
        """
        self.timer.Stop()
        ProcThread()
 
    #----------------------------------------------------------------------
    def updateDisplay(self, msg):
        """"""
        self.procs = msg.data
        self.setProcs()
        if not self.timer.IsRunning():
            self.timer.Start(15000)
 
########################################################################
class MainFrame(wx.Frame):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        wx.Frame.__init__(self, None, title="PyProcMon")
        panel = MainPanel(self)
        self.Show()
 
if __name__ == "__main__":
    app = wx.App(False)
    frame = MainFrame()
    app.MainLoop()

We also increased the number of seconds between updates to 15. I did this mostly because it was updating too fast and I wasn’t able to get a good look at my list before it updated again. At this point, I noticed that I couldn’t change the column sizes without them resetting every update. I also wanted the application to keep track of which column I had sorted by and what my last selection was. Finally, I needed a way to kill processes.

Take 3: Adding Basic Features

So in this third iteration, we add all that. Check it out:

import psutil # http://code.google.com/p/psutil/
import wx
 
from ObjectListView import ObjectListView, ColumnDefn
from threading import Thread
from wx.lib.pubsub import Publisher
 
########################################################################
class ProcThread(Thread):
    """
    Gets all the process information we need as psutil isn't very fast
    """
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        Thread.__init__(self)
        self.start() 
 
    #----------------------------------------------------------------------
    def run(self):
        """"""
        pids = psutil.get_pid_list()
        procs = []
        for pid in pids:
            try:
                p = psutil.Process(pid)
                new_proc = Process(p.name,
                                   str(p.pid),
                                   p.exe,
                                   p.username,
                                   str(p.get_cpu_percent()),
                                   str(p.get_memory_percent())
                                   )
                procs.append(new_proc)
            except:
                pass
 
        # send pids to GUI
        wx.CallAfter(Publisher().sendMessage, "update", procs)
 
########################################################################
class Process(object):
    """
    Definition of Process model for ObjectListView
    """
 
    #----------------------------------------------------------------------
    def __init__(self, name, pid, exe, user, cpu, mem, desc=None):
        """Constructor"""
        self.name = name
        self.pid = pid
        self.exe = exe
        self.user = user
        self.cpu = cpu
        self.mem = mem
        #self.desc = desc
 
########################################################################
class MainPanel(wx.Panel):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self, parent):
        """Constructor"""
        wx.Panel.__init__(self, parent=parent)
        self.currentSelection = None
        self.gui_shown = False
        self.procs = []
        self.sort_col = 0
 
        self.col_w = {"name":175,
                      "pid":50,
                      "exe":300,
                      "user":175,
                      "cpu":60,
                      "mem":75}
 
        self.procmonOlv = ObjectListView(self, style=wx.LC_REPORT|wx.SUNKEN_BORDER)
        self.procmonOlv.Bind(wx.EVT_LIST_COL_CLICK, self.onColClick)
        self.procmonOlv.Bind(wx.EVT_LIST_ITEM_SELECTED, self.onSelect)
        #self.procmonOlv.Select
        self.setProcs()
 
        endProcBtn = wx.Button(self, label="End Process")
        endProcBtn.Bind(wx.EVT_BUTTON, self.onKillProc)
 
        mainSizer = wx.BoxSizer(wx.VERTICAL)
        mainSizer.Add(self.procmonOlv, 1, wx.EXPAND|wx.ALL, 5)
        mainSizer.Add(endProcBtn, 0, wx.ALIGN_RIGHT|wx.ALL, 5)
        self.SetSizer(mainSizer)
 
        # check for updates every 15 seconds
        self.timer = wx.Timer(self)
        self.Bind(wx.EVT_TIMER, self.update, self.timer)
        self.update("")
        self.setProcs()
 
        # create a pubsub receiver
        Publisher().subscribe(self.updateDisplay, "update")
 
    #----------------------------------------------------------------------
    def onColClick(self, event):
        """
        Remember which column to sort by, currently only does ascending
        """
        self.sort_col = event.GetColumn()
 
    #----------------------------------------------------------------------
    def onKillProc(self, event):
        """
        Kill the selected process by pid
        """
        obj = self.procmonOlv.GetSelectedObject()
        print
        pid = int(obj.pid)
        try:
            p = psutil.Process(pid)
            p.terminate()
            self.update("")
        except Exception, e:
            print "Error: " + e
 
    #----------------------------------------------------------------------
    def onSelect(self, event):
        """"""
        item = event.GetItem()
        itemId = item.GetId()
        self.currentSelection = itemId
        print
 
    #----------------------------------------------------------------------
    def setProcs(self):
        """"""
        cw = self.col_w
        # change column widths as necessary
        if self.gui_shown:
            cw["name"] = self.procmonOlv.GetColumnWidth(0)
            cw["pid"] = self.procmonOlv.GetColumnWidth(1)
            cw["exe"] = self.procmonOlv.GetColumnWidth(2)
            cw["user"] = self.procmonOlv.GetColumnWidth(3)
            cw["cpu"] = self.procmonOlv.GetColumnWidth(4)
            cw["mem"] = self.procmonOlv.GetColumnWidth(5)
 
        cols = [
            ColumnDefn("name", "left", cw["name"], "name"),
            ColumnDefn("pid", "left", cw["pid"], "pid"),
            ColumnDefn("exe location", "left", cw["exe"], "exe"),
            ColumnDefn("username", "left", cw["user"], "user"),
            ColumnDefn("cpu", "left", cw["cpu"], "cpu"),
            ColumnDefn("mem", "left", cw["mem"], "mem"),
            #ColumnDefn("description", "left", 200, "desc")
            ]
        self.procmonOlv.SetColumns(cols)
        self.procmonOlv.SetObjects(self.procs)
        self.procmonOlv.SortBy(self.sort_col)
        if self.currentSelection:
            self.procmonOlv.Select(self.currentSelection)
            self.procmonOlv.SetFocus()
        self.gui_shown = True
 
    #----------------------------------------------------------------------
    def update(self, event):
        """
        Start a thread to get the pid information
        """
        print "update thread started!"
        self.timer.Stop()
        ProcThread()
 
    #----------------------------------------------------------------------
    def updateDisplay(self, msg):
        """"""
        print "thread done, updating display!"
        self.procs = msg.data
        self.setProcs()
        if not self.timer.IsRunning():
            self.timer.Start(15000)
 
########################################################################
class MainFrame(wx.Frame):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        wx.Frame.__init__(self, None, title="PyProcMon", size=(1024, 768))
        panel = MainPanel(self)
        self.Show()
 
#----------------------------------------------------------------------
if __name__ == "__main__":
    app = wx.App(False)
    frame = MainFrame()
    app.MainLoop()

You’ll note that we had to catch a couple of events to keep track of the column sorting and what the current selection was. I wasn’t able to figure out to tell which direction the sort was in or how to change that though, so that’s still on my TODO list. However, there was one other feature I wanted to add: a statusbar with information on the number of processes, CPU usage and memory.

The Final Product: PyProcMon

For the final version (for now anyway), we add a 3-piece statusbar and another pubsub receiver/publisher to the mix. We also split out some of the code into their own modules. The threading code goes into controller.py, the Process class goes into model.py and the rest stays where it was. We’ll start with the controller:

# controller.py
########################################################################
import psutil
import wx
 
from model import Process
from threading import Thread
from wx.lib.pubsub import Publisher
 
########################################################################
class ProcThread(Thread):
    """
    Gets all the process information we need as psutil isn't very fast
    """
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        Thread.__init__(self)
        self.start() 
 
    #----------------------------------------------------------------------
    def run(self):
        """"""
        pids = psutil.get_pid_list()
        procs = []
        cpu_percent = 0
        mem_percent = 0
        for pid in pids:
            try:
                p = psutil.Process(pid)
                cpu = p.get_cpu_percent()
                mem = p.get_memory_percent()
                new_proc = Process(p.name,
                                   str(p.pid),
                                   p.exe,
                                   p.username,
                                   str(cpu),
                                   str(mem)
                                   )
                procs.append(new_proc)
                cpu_percent += cpu
                mem_percent += mem
            except:
                pass
 
        # send pids to GUI
        wx.CallAfter(Publisher().sendMessage, "update", procs)
 
        number_of_procs = len(procs)
        wx.CallAfter(Publisher().sendMessage, "update_status",
                     (number_of_procs, cpu_percent, mem_percent))

You’ve already seen this, so let’s move on to the model:

# model.py
########################################################################
class Process(object):
    """
    Definition of Process model for ObjectListView
    """
 
    #----------------------------------------------------------------------
    def __init__(self, name, pid, exe, user, cpu, mem, desc=None):
        """Constructor"""
        self.name = name
        self.pid = pid
        self.exe = exe
        self.user = user
        self.cpu = cpu
        self.mem = mem

That’s super simple! Note that we don’t even need to import anything into this one. Now let’s see how the meat of project turned out:

# pyProcMon.py
import controller
import psutil # http://code.google.com/p/psutil/
import wx
 
from ObjectListView import ObjectListView, ColumnDefn
from wx.lib.pubsub import Publisher
 
########################################################################
class MainPanel(wx.Panel):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self, parent):
        """Constructor"""
        wx.Panel.__init__(self, parent=parent)
        self.currentSelection = None
        self.gui_shown = False
        self.procs = []
        self.sort_col = 0
 
        self.col_w = {"name":175,
                      "pid":50,
                      "exe":300,
                      "user":175,
                      "cpu":60,
                      "mem":75}
 
        self.procmonOlv = ObjectListView(self, style=wx.LC_REPORT|wx.SUNKEN_BORDER)
        self.procmonOlv.Bind(wx.EVT_LIST_COL_CLICK, self.onColClick)
        self.procmonOlv.Bind(wx.EVT_LIST_ITEM_SELECTED, self.onSelect)
        #self.procmonOlv.Select
        self.setProcs()
 
        endProcBtn = wx.Button(self, label="End Process")
        endProcBtn.Bind(wx.EVT_BUTTON, self.onKillProc)
 
        mainSizer = wx.BoxSizer(wx.VERTICAL)
        mainSizer.Add(self.procmonOlv, 1, wx.EXPAND|wx.ALL, 5)
        mainSizer.Add(endProcBtn, 0, wx.ALIGN_RIGHT|wx.ALL, 5)
        self.SetSizer(mainSizer)
 
        # check for updates every 15 seconds
        self.timer = wx.Timer(self)
        self.Bind(wx.EVT_TIMER, self.update, self.timer)
        self.update("")
        self.setProcs()
 
        # create a pubsub receiver
        Publisher().subscribe(self.updateDisplay, "update")
 
    #----------------------------------------------------------------------
    def onColClick(self, event):
        """
        Remember which column to sort by, currently only does ascending
        """
        self.sort_col = event.GetColumn()
 
    #----------------------------------------------------------------------
    def onKillProc(self, event):
        """
        Kill the selected process by pid
        """
        obj = self.procmonOlv.GetSelectedObject()
        print
        pid = int(obj.pid)
        try:
            p = psutil.Process(pid)
            p.terminate()
            self.update("")
        except Exception, e:
            print "Error: " + e
 
    #----------------------------------------------------------------------
    def onSelect(self, event):
        """
        Gets called when an item is selected and helps keep track of 
        what item is selected
        """
        item = event.GetItem()
        itemId = item.GetId()
        self.currentSelection = itemId
 
    #----------------------------------------------------------------------
    def setProcs(self):
        """
        Updates the ObjectListView widget display
        """
        cw = self.col_w
        # change column widths as necessary
        if self.gui_shown:
            cw["name"] = self.procmonOlv.GetColumnWidth(0)
            cw["pid"] = self.procmonOlv.GetColumnWidth(1)
            cw["exe"] = self.procmonOlv.GetColumnWidth(2)
            cw["user"] = self.procmonOlv.GetColumnWidth(3)
            cw["cpu"] = self.procmonOlv.GetColumnWidth(4)
            cw["mem"] = self.procmonOlv.GetColumnWidth(5)
 
        cols = [
            ColumnDefn("name", "left", cw["name"], "name"),
            ColumnDefn("pid", "left", cw["pid"], "pid"),
            ColumnDefn("exe location", "left", cw["exe"], "exe"),
            ColumnDefn("username", "left", cw["user"], "user"),
            ColumnDefn("cpu", "left", cw["cpu"], "cpu"),
            ColumnDefn("mem", "left", cw["mem"], "mem"),
            #ColumnDefn("description", "left", 200, "desc")
            ]
        self.procmonOlv.SetColumns(cols)
        self.procmonOlv.SetObjects(self.procs)
        self.procmonOlv.SortBy(self.sort_col)
        if self.currentSelection:
            self.procmonOlv.Select(self.currentSelection)
            self.procmonOlv.SetFocus()
        self.gui_shown = True
 
    #----------------------------------------------------------------------
    def update(self, event):
        """
        Start a thread to get the pid information
        """
        print "update thread started!"
        self.timer.Stop()
        controller.ProcThread()
 
    #----------------------------------------------------------------------
    def updateDisplay(self, msg):
        """
        Catches the pubsub message from the thread and updates the display
        """
        print "thread done, updating display!"
        self.procs = msg.data
        self.setProcs()
        if not self.timer.IsRunning():
            self.timer.Start(15000)
 
########################################################################
class MainFrame(wx.Frame):
    """"""
 
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        wx.Frame.__init__(self, None, title="PyProcMon", size=(1024, 768))
        panel = MainPanel(self)
 
        # set up the statusbar
        self.CreateStatusBar()
        self.StatusBar.SetFieldsCount(3)
        self.StatusBar.SetStatusWidths([200, 200, 200])
 
        # create a pubsub receiver
        Publisher().subscribe(self.updateStatusbar, "update_status")
 
        self.Show()
 
    #----------------------------------------------------------------------
    def updateStatusbar(self, msg):
        """"""
        procs, cpu, mem = msg.data
        self.SetStatusText("Processes: %s" % procs, 0)
        self.SetStatusText("CPU Usage: %s" % cpu, 1)
        self.SetStatusText("Physical Memory: %s" % mem, 2)
 
 
#----------------------------------------------------------------------
if __name__ == "__main__":
    app = wx.App(False)
    frame = MainFrame()
    app.MainLoop()

The main thing here is the added Statusbar and its updating mechanism. It took a little fiddling to get it right, but now it updates along with the display.

Wrapping Up

You may be wondering why the pid information gathering process is in a try/except clause. Well some pids don’t want to give up their information or manage to stop existing between the time I grab the list and the time I try to extract information, so I have to skip those. There’s actually a LOT of pids that are like that. I also wrapped the killing process in a try/except as I expect some processes can’t be killed. Otherwise this has worked pretty well. Here are just a few features I’d like to add: right-click kill / context menu, a confirmation dialog, a menubar with some options (close, start a new program, about).

I hope you enjoyed learning along with me and got something cool in the process. Happy hacking!

Source Code

Print Friendly