A simple python sandbox for Celery and not

At the end of this post you will find the source code of a python sandbox that I write for have a general sandbox that work for all my celery tasks.
Anyway this is a general python code and you can use it also without Celery but here I will give you also some example to how to use it in easy and fast way with Celery.

The sandbox is wrote as a Context Manager (under suggestion of asksol ;-) ) with an entry point that will create the folders and an exit point that will clean it.
The class create a folder structure like this:

/sandboxradix/tasks.mytask.dosomething/taskpid/

On the exit point only the taskpid folder will be erased, in this way you can launch several tasks of the same type at the same time
Let’s see how you can use it for you celery task.

In my personal opinion the best way to do it is to create a base class for all your tasks that extend the celery task class and inside it implement some general useful methods like the sandbox behaviour.
Following you can see an example of a general base class that use my sandbox context manager.

import settings
from celery.task import Task
from sandBox import SandBox

class toforgeTask(Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""

        self.sandbox = SandBox(self.name, self.request.id,
                               settings.PATH_SANDBOX)
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #: If the sandbox is initialized we clean it
        if self.sandbox.isInitialized():
            self.sandbox.__exit__()

    def run(self):
        return "This is a Base Task Class not runnable"

As you can read on the comments the __call__ method is called before the task execution.
Here we call the constructor of our sanbox but we don’t initialize it because maybe not all our tasks needs sandbox ;-) .
For the same reason in the method after_return, called form celery at the end of a task whatever is the status (RETRY,FAILURE, ecc.), we will clean the sandbox only if we used it.
The settings module that you see in my general task is the django settings that I use for store conf vars from my task like the PATH_SANDBOX the radix folder for your sandbox.

Now you can write a task by extend the base task that we created and maybe it will look like this

from toforgeTask import toforgeTask

class sampleDownloader(toforgeTask):

    def run(self, album_id, **kwargs):
        self.logger.info("Download a test file %s " % (album_id))

        #: initialize tha sanbox
        self.sandbox.__enter__()

        #: download a file
        self.writeFileFromUrl(self,"http://www.toforge.com/notexistfile.zip",
                              "myfile")

        #: do some stuff with the file in the sandbox
        #: you can open files in the standard way just use
        #: the self.sandbox.path variable

        return """File downloaded, but you will not found it because at the
               end of the task the sandbox will be cleaned"""

Following you can see the source code of the protagonist of this post, the sandBox.py file.

import errno
import os
import shutil
import urllib2

class SandBox(object):
    """A simple sandbox context manager for easy create and clean of a
    sandbox folder plus some useful methods for file download and write

    Mauro Rocco

http://www.toforge.com

    """

    def __init__(self, name, id, prefix, debug = False):

        #: name of the folder, commonly the name of the task
        self.name = name

        #: pid of the task, the name of the end folder where the sandbox
        #: writes the files
        self.id = id

        #: the base path for your sandbox
        self.prefix = prefix

        #: te complete path used in the context
        self.path = None

        #: debug option for print some output information
        self.debug = debug

    def __enter__(self):
        self.path = os.path.join(self.prefix, self.name, self.id)

        try:
            os.makedirs(self.path)
        except OSError, exc:
            if exc.errno != errno.EEXIST:
                raise

        return self

    def __exit__(self, *exc_info):
        try:
            shutil.rmtree(self.path)
        except OSError, exc:
            if exc.errno != errno.ENOENT:
                raise

    def _debugPrint(self,message):
        if self.debug:
            print message

    def _createFile(self, fileName, mode, override = False):
        if not self.isInitialized():
            self.__enter__()

        if override:
            return open(os.path.join(self.path,fileName), mode)
        try:
            open(os.path.join(self.path,fileName))
        except IOError, exc:
            if exc.errno == errno.ENOENT:
                return open(os.path.join(self.path,fileName), mode)
            raise
        raise Exception("File already exist in sandbox: %s "
                        % (os.path.join(self.path,fileName)))

    def isInitialized(self):
        """Check if the sandbox is initialized"""
        if self.path:
            return True
        return False

    def getSandboxContent(self):
        """Return a list of all folders and files
        in the current sandbox folder"""
        if not self.isInitialized():
            self.__enter__()
        tree = []
        for dirname, dirnames, filenames in os.walk(self.path):
            for subdirname in dirnames:
                tree.append(os.path.join(dirname, subdirname))
            for filename in filenames:
                tree.append(os.path.join(dirname, filename))
        return tree

    def writeFile(self,content, fileName, mode = "w", override = False):
        """write a file in the sandbox folder"""
        f = self._createFile(fileName, mode, override)
        f.write(str(content))
        f.close()

    def writeFileFromUrl(self,url,fileName, override = False, httpproxy=None,
                         ftpproxy=None, gopherproxy=None):
        """Download a file from the given url  and write it
         on the sandbox with the given name"""
        proxies={}

        if httpproxy:
            proxies["http"]=httpproxy

        if ftpproxy:
            proxies["ftp"]=ftpproxy

        if gopherproxy:
            proxies["gopher"]=gopherproxy

        outfile=self._createFile(fileName, "wb", override)

        proxy = urllib2.ProxyHandler(proxies)
        opener = urllib2.build_opener(proxy)
        urllib2.install_opener(opener)

        instream = urllib2.urlopen(url)
        length = instream.info().getheader("Content-Length")

        if length==None:
            length="?"

        self._debugPrint("Downloading %s (%s bytes) ..."
                         % (instream.url, length))

        bytesRead=0.0

        for line in instream:
            bytesRead+=len(line)
            outfile.write(line)

            if length!="?":
                self._debugPrint("Downloading... %.02f/%.02f kb (%d%%)"
                                 % (fileName,
                                    bytesRead/1024.0,
                                    length/1024.0,
                                    100*bytesRead/length))

        instream.close()
        outfile.close()
        self._debugPrint("File downloaded successful")

Enjoy :-)