At the end of this post you will find the source code of a python sandbox that I write for have a general sandbox that work for all my celery tasks.
Anyway this is a general python code and you can use it also without Celery but here I will give you also some example to how to use it in easy and fast way with Celery.
The sandbox is wrote as a Context Manager (under suggestion of asksol
) with an entry point that will create the folders and an exit point that will clean it.
The class create a folder structure like this:
/sandboxradix/tasks.mytask.dosomething/taskpid/
On the exit point only the taskpid folder will be erased, in this way you can launch several tasks of the same type at the same time
Let’s see how you can use it for you celery task.
In my personal opinion the best way to do it is to create a base class for all your tasks that extend the celery task class and inside it implement some general useful methods like the sandbox behaviour.
Following you can see an example of a general base class that use my sandbox context manager.
import settings
from celery.task import Task
from sandBox import SandBox
class toforgeTask(Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
self.sandbox = SandBox(self.name, self.request.id,
settings.PATH_SANDBOX)
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#: If the sandbox is initialized we clean it
if self.sandbox.isInitialized():
self.sandbox.__exit__()
def run(self):
return "This is a Base Task Class not runnable"
As you can read on the comments the __call__ method is called before the task execution.
Here we call the constructor of our sanbox but we don’t initialize it because maybe not all our tasks needs sandbox
.
For the same reason in the method after_return, called form celery at the end of a task whatever is the status (RETRY,FAILURE, ecc.), we will clean the sandbox only if we used it.
The settings module that you see in my general task is the django settings that I use for store conf vars from my task like the PATH_SANDBOX the radix folder for your sandbox.
Now you can write a task by extend the base task that we created and maybe it will look like this
from toforgeTask import toforgeTask
class sampleDownloader(toforgeTask):
def run(self, album_id, **kwargs):
self.logger.info("Download a test file %s " % (album_id))
#: initialize tha sanbox
self.sandbox.__enter__()
#: download a file
self.writeFileFromUrl(self,"http://www.toforge.com/notexistfile.zip",
"myfile")
#: do some stuff with the file in the sandbox
#: you can open files in the standard way just use
#: the self.sandbox.path variable
return """File downloaded, but you will not found it because at the
end of the task the sandbox will be cleaned"""
Following you can see the source code of the protagonist of this post, the sandBox.py file.
import errno
import os
import shutil
import urllib2
class SandBox(object):
"""A simple sandbox context manager for easy create and clean of a
sandbox folder plus some useful methods for file download and write
Mauro Rocco
http://www.toforge.com
"""
def __init__(self, name, id, prefix, debug = False):
#: name of the folder, commonly the name of the task
self.name = name
#: pid of the task, the name of the end folder where the sandbox
#: writes the files
self.id = id
#: the base path for your sandbox
self.prefix = prefix
#: te complete path used in the context
self.path = None
#: debug option for print some output information
self.debug = debug
def __enter__(self):
self.path = os.path.join(self.prefix, self.name, self.id)
try:
os.makedirs(self.path)
except OSError, exc:
if exc.errno != errno.EEXIST:
raise
return self
def __exit__(self, *exc_info):
try:
shutil.rmtree(self.path)
except OSError, exc:
if exc.errno != errno.ENOENT:
raise
def _debugPrint(self,message):
if self.debug:
print message
def _createFile(self, fileName, mode, override = False):
if not self.isInitialized():
self.__enter__()
if override:
return open(os.path.join(self.path,fileName), mode)
try:
open(os.path.join(self.path,fileName))
except IOError, exc:
if exc.errno == errno.ENOENT:
return open(os.path.join(self.path,fileName), mode)
raise
raise Exception("File already exist in sandbox: %s "
% (os.path.join(self.path,fileName)))
def isInitialized(self):
"""Check if the sandbox is initialized"""
if self.path:
return True
return False
def getSandboxContent(self):
"""Return a list of all folders and files
in the current sandbox folder"""
if not self.isInitialized():
self.__enter__()
tree = []
for dirname, dirnames, filenames in os.walk(self.path):
for subdirname in dirnames:
tree.append(os.path.join(dirname, subdirname))
for filename in filenames:
tree.append(os.path.join(dirname, filename))
return tree
def writeFile(self,content, fileName, mode = "w", override = False):
"""write a file in the sandbox folder"""
f = self._createFile(fileName, mode, override)
f.write(str(content))
f.close()
def writeFileFromUrl(self,url,fileName, override = False, httpproxy=None,
ftpproxy=None, gopherproxy=None):
"""Download a file from the given url and write it
on the sandbox with the given name"""
proxies={}
if httpproxy:
proxies["http"]=httpproxy
if ftpproxy:
proxies["ftp"]=ftpproxy
if gopherproxy:
proxies["gopher"]=gopherproxy
outfile=self._createFile(fileName, "wb", override)
proxy = urllib2.ProxyHandler(proxies)
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
instream = urllib2.urlopen(url)
length = instream.info().getheader("Content-Length")
if length==None:
length="?"
self._debugPrint("Downloading %s (%s bytes) ..."
% (instream.url, length))
bytesRead=0.0
for line in instream:
bytesRead+=len(line)
outfile.write(line)
if length!="?":
self._debugPrint("Downloading... %.02f/%.02f kb (%d%%)"
% (fileName,
bytesRead/1024.0,
length/1024.0,
100*bytesRead/length))
instream.close()
outfile.close()
self._debugPrint("File downloaded successful")
Enjoy
