mirror of
				https://github.com/xcat2/xcat-core.git
				synced 2025-10-27 17:35:33 +00:00 
			
		
		
		
	git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@7609 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
		
			
				
	
	
		
			1357 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1357 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # vim:set et ts=4 fdc=0 fdn=2 fdl=0:
 | |
| #
 | |
| # There are no blank lines between blocks beacause i use folding from:
 | |
| # http://www.vim.org/scripts/script.php?script_id=515
 | |
| #
 | |
| 
 | |
| """= QWeb Framework =
 | |
| 
 | |
| == What is QWeb ? ==
 | |
| 
 | |
| QWeb is a python based [http://www.python.org/doc/peps/pep-0333/ WSGI]
 | |
| compatible web framework, it provides an infratructure to quickly build web
 | |
| applications consisting of:
 | |
| 
 | |
|  * A lightweight request handler (QWebRequest)
 | |
|  * An xml templating engine (QWebXml and QWebHtml)
 | |
|  * A simple name based controler (qweb_control)
 | |
|  * A standalone WSGI Server (QWebWSGIServer)
 | |
|  * A cgi and fastcgi WSGI wrapper (taken from flup)
 | |
|  * A startup function that starts cgi, factgi or standalone according to the
 | |
|    evironement (qweb_autorun).
 | |
| 
 | |
| QWeb applications are runnable in standalone mode (from commandline), via
 | |
| FastCGI, Regular CGI or by any python WSGI compliant server.
 | |
| 
 | |
| QWeb doesn't provide any database access but it integrates nicely with ORMs
 | |
| such as SQLObject, SQLAlchemy or plain DB-API.
 | |
| 
 | |
| Written by Antony Lesuisse (email al AT udev.org)
 | |
| 
 | |
| Homepage: http://antony.lesuisse.org/qweb/trac/
 | |
| 
 | |
| Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]
 | |
| 
 | |
| == Quick Start (for Linux, MacOS X and cygwin) ==
 | |
| 
 | |
| Make sure you have at least python 2.3 installed and run the following commands:
 | |
| 
 | |
| {{{
 | |
| $ wget http://antony.lesuisse.org/qweb/files/QWeb-0.7.tar.gz
 | |
| $ tar zxvf QWeb-0.7.tar.gz
 | |
| $ cd QWeb-0.7/examples/blog
 | |
| $ ./blog.py
 | |
| }}}
 | |
| 
 | |
| And point your browser to http://localhost:8080/
 | |
| 
 | |
| You may also try AjaxTerm which uses qweb request handler.
 | |
| 
 | |
| == Download ==
 | |
| 
 | |
|  * Version 0.7:
 | |
|    * Source [/qweb/files/QWeb-0.7.tar.gz QWeb-0.7.tar.gz]
 | |
|    * Python 2.3 Egg [/qweb/files/QWeb-0.7-py2.3.egg QWeb-0.7-py2.3.egg]
 | |
|    * Python 2.4 Egg [/qweb/files/QWeb-0.7-py2.4.egg QWeb-0.7-py2.4.egg]
 | |
| 
 | |
|  * [/qweb/trac/browser Browse the source repository]
 | |
| 
 | |
| == Documentation ==
 | |
| 
 | |
|  * [/qweb/trac/browser/trunk/README.txt?format=raw Read the included documentation] 
 | |
|  * QwebTemplating
 | |
| 
 | |
| == Mailin-list ==
 | |
| 
 | |
|  * Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]
 | |
|  * No mailing-list exists yet, discussion should happen on: [http://mail.python.org/mailman/listinfo/web-sig web-sig] [http://mail.python.org/pipermail/web-sig/ archives]
 | |
| 
 | |
| QWeb Components:
 | |
| ----------------
 | |
| 
 | |
| QWeb also feature a simple components api, that enables developers to easily
 | |
| produces reusable components.
 | |
| 
 | |
| Default qweb components:
 | |
| 
 | |
|     - qweb_static:
 | |
|         A qweb component to serve static content from the filesystem or from
 | |
|         zipfiles.
 | |
| 
 | |
|     - qweb_dbadmin:
 | |
|         scaffolding for sqlobject
 | |
| 
 | |
| License
 | |
| -------
 | |
| qweb/fcgi.py wich is BSD-like from saddi.com.
 | |
| Everything else is put in the public domain.
 | |
| 
 | |
| 
 | |
| TODO
 | |
| ----
 | |
|     Announce QWeb to python-announce-list@python.org web-sig@python.org
 | |
|     qweb_core
 | |
|         rename request methods into
 | |
|             request_save_files
 | |
|             response_404
 | |
|             response_redirect
 | |
|             response_download
 | |
|         request callback_generator, callback_function ?
 | |
|         wsgi callback_server_local
 | |
|         xml tags explicitly call render_attributes(t_att)?
 | |
|         priority form-checkbox over t-value (for t-option)
 | |
| 
 | |
| """
 | |
| 
 | |
| import BaseHTTPServer,SocketServer,Cookie
 | |
| import cgi,datetime,email,email.Message,errno,gzip,os,random,re,socket,sys,tempfile,time,types,urllib,urlparse,xml.dom
 | |
| try:
 | |
|     import cPickle as pickle
 | |
| except ImportError:
 | |
|     import pickle
 | |
| try:
 | |
|     import cStringIO as StringIO
 | |
| except ImportError:
 | |
|     import StringIO
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # Qweb Xml t-raw t-esc t-if t-foreach t-set t-call t-trim
 | |
| #----------------------------------------------------------
 | |
| class QWebEval:
 | |
|     def __init__(self,data):
 | |
|         self.data=data
 | |
|     def __getitem__(self,expr):
 | |
|         if self.data.has_key(expr):
 | |
|             return self.data[expr]
 | |
|         r=None
 | |
|         try:
 | |
|             r=eval(expr,self.data)
 | |
|         except NameError,e:
 | |
|             pass
 | |
|         except AttributeError,e:
 | |
|             pass
 | |
|         except Exception,e:
 | |
|             print "qweb: expression error '%s' "%expr,e
 | |
|         if self.data.has_key("__builtins__"):
 | |
|             del self.data["__builtins__"]
 | |
|         return r
 | |
|     def eval_object(self,expr):
 | |
|         return self[expr]
 | |
|     def eval_str(self,expr):
 | |
|         if expr=="0":
 | |
|             return self.data[0]
 | |
|         if isinstance(self[expr],unicode):
 | |
|             return self[expr].encode("utf8")
 | |
|         return str(self[expr])
 | |
|     def eval_format(self,expr):
 | |
|         try:
 | |
|             return str(expr%self)
 | |
|         except:
 | |
|             return "qweb: format error '%s' "%expr
 | |
| #       if isinstance(r,unicode):
 | |
| #           return r.encode("utf8")
 | |
|     def eval_bool(self,expr):
 | |
|         if self.eval_object(expr):
 | |
|             return 1
 | |
|         else:
 | |
|             return 0
 | |
| class QWebXml:
 | |
|     """QWeb Xml templating engine
 | |
|     
 | |
|     The templating engine use a very simple syntax, "magic" xml attributes, to
 | |
|     produce any kind of texutal output (even non-xml).
 | |
|     
 | |
|     QWebXml:
 | |
|         the template engine core implements the basic magic attributes:
 | |
|     
 | |
|         t-att t-raw t-esc t-if t-foreach t-set t-call t-trim
 | |
|     
 | |
|     """
 | |
|     def __init__(self,x=None,zipname=None):
 | |
|         self.node=xml.dom.Node
 | |
|         self._t={}
 | |
|         self._render_tag={}
 | |
|         prefix='render_tag_'
 | |
|         for i in [j for j in dir(self) if j.startswith(prefix)]:
 | |
|             name=i[len(prefix):].replace('_','-')
 | |
|             self._render_tag[name]=getattr(self.__class__,i)
 | |
| 
 | |
|         self._render_att={}
 | |
|         prefix='render_att_'
 | |
|         for i in [j for j in dir(self) if j.startswith(prefix)]:
 | |
|             name=i[len(prefix):].replace('_','-')
 | |
|             self._render_att[name]=getattr(self.__class__,i)
 | |
| 
 | |
|         if x!=None:
 | |
|             if zipname!=None:
 | |
|                 import zipfile
 | |
|                 zf=zipfile.ZipFile(zipname, 'r')
 | |
|                 self.add_template(zf.read(x))
 | |
|             else:
 | |
|                 self.add_template(x)
 | |
|     def register_tag(self,tag,func):
 | |
|         self._render_tag[tag]=func
 | |
|     def add_template(self,x):
 | |
|         if hasattr(x,'documentElement'):
 | |
|             dom=x
 | |
|         elif x.startswith("<?xml"):
 | |
|             import xml.dom.minidom
 | |
|             dom=xml.dom.minidom.parseString(x)
 | |
|         else:
 | |
|             import xml.dom.minidom
 | |
|             dom=xml.dom.minidom.parse(x)
 | |
|         for n in dom.documentElement.childNodes:
 | |
|             if n.nodeName=="t":
 | |
|                 self._t[str(n.getAttribute("t-name"))]=n
 | |
|     def get_template(self,name):
 | |
|         return self._t[name]
 | |
| 
 | |
|     def eval_object(self,expr,v):
 | |
|         return QWebEval(v).eval_object(expr)
 | |
|     def eval_str(self,expr,v):
 | |
|         return QWebEval(v).eval_str(expr)
 | |
|     def eval_format(self,expr,v):
 | |
|         return QWebEval(v).eval_format(expr)
 | |
|     def eval_bool(self,expr,v):
 | |
|         return QWebEval(v).eval_bool(expr)
 | |
| 
 | |
|     def render(self,tname,v={},out=None):
 | |
|         if self._t.has_key(tname):
 | |
|             return self.render_node(self._t[tname],v)
 | |
|         else:
 | |
|             return 'qweb: template "%s" not found'%tname
 | |
|     def render_node(self,e,v):
 | |
|         r=""
 | |
|         if e.nodeType==self.node.TEXT_NODE or e.nodeType==self.node.CDATA_SECTION_NODE:
 | |
|             r=e.data.encode("utf8")
 | |
|         elif e.nodeType==self.node.ELEMENT_NODE:
 | |
|             pre=""
 | |
|             g_att=""
 | |
|             t_render=None
 | |
|             t_att={}
 | |
|             for (an,av) in e.attributes.items():
 | |
|                 an=str(an)
 | |
|                 if isinstance(av,types.UnicodeType):
 | |
|                     av=av.encode("utf8")
 | |
|                 else:
 | |
|                     av=av.nodeValue.encode("utf8")
 | |
|                 if an.startswith("t-"):
 | |
|                     for i in self._render_att:
 | |
|                         if an[2:].startswith(i):
 | |
|                             g_att+=self._render_att[i](self,e,an,av,v)
 | |
|                             break
 | |
|                     else:
 | |
|                         if self._render_tag.has_key(an[2:]):
 | |
|                             t_render=an[2:]
 | |
|                         t_att[an[2:]]=av
 | |
|                 else:
 | |
|                     g_att+=' %s="%s"'%(an,cgi.escape(av,1));
 | |
|             if t_render:
 | |
|                 if self._render_tag.has_key(t_render):
 | |
|                     r=self._render_tag[t_render](self,e,t_att,g_att,v)
 | |
|             else:
 | |
|                 r=self.render_element(e,g_att,v,pre,t_att.get("trim",0))
 | |
|         return r
 | |
|     def render_element(self,e,g_att,v,pre="",trim=0):
 | |
|         g_inner=[]
 | |
|         for n in e.childNodes:
 | |
|             g_inner.append(self.render_node(n,v))
 | |
|         name=str(e.nodeName)
 | |
|         inner="".join(g_inner)
 | |
|         if trim==0:
 | |
|             pass
 | |
|         elif trim=='left':
 | |
|             inner=inner.lstrip()
 | |
|         elif trim=='right':
 | |
|             inner=inner.rstrip()
 | |
|         elif trim=='both':
 | |
|             inner=inner.strip()
 | |
|         if name=="t":
 | |
|             return inner
 | |
|         elif len(inner):
 | |
|             return "<%s%s>%s%s</%s>"%(name,g_att,pre,inner,name)
 | |
|         else:
 | |
|             return "<%s%s/>"%(name,g_att)
 | |
| 
 | |
|     # Attributes
 | |
|     def render_att_att(self,e,an,av,v):
 | |
|         if an.startswith("t-attf-"):
 | |
|             att,val=an[7:],self.eval_format(av,v)
 | |
|         elif an.startswith("t-att-"):
 | |
|             att,val=(an[6:],self.eval_str(av,v))
 | |
|         else:
 | |
|             att,val=self.eval_object(av,v)
 | |
|         return ' %s="%s"'%(att,cgi.escape(val,1))
 | |
| 
 | |
|     # Tags
 | |
|     def render_tag_raw(self,e,t_att,g_att,v):
 | |
|         return self.eval_str(t_att["raw"],v)
 | |
|     def render_tag_rawf(self,e,t_att,g_att,v):
 | |
|         return self.eval_format(t_att["rawf"],v)
 | |
|     def render_tag_esc(self,e,t_att,g_att,v):
 | |
|         return cgi.escape(self.eval_str(t_att["esc"],v))
 | |
|     def render_tag_escf(self,e,t_att,g_att,v):
 | |
|         return cgi.escape(self.eval_format(t_att["escf"],v))
 | |
|     def render_tag_foreach(self,e,t_att,g_att,v):
 | |
|         expr=t_att["foreach"]
 | |
|         enum=self.eval_object(expr,v)
 | |
|         if enum!=None:
 | |
|             var=t_att.get('as',expr).replace('.','_')
 | |
|             d=v.copy()
 | |
|             size=-1
 | |
|             if isinstance(enum,types.ListType):
 | |
|                 size=len(enum)
 | |
|             elif isinstance(enum,types.TupleType):
 | |
|                 size=len(enum)
 | |
|             elif hasattr(enum,'count'):
 | |
|                 size=enum.count()
 | |
|             d["%s_size"%var]=size
 | |
|             d["%s_all"%var]=enum
 | |
|             index=0
 | |
|             ru=[]
 | |
|             for i in enum:
 | |
|                 d["%s_value"%var]=i
 | |
|                 d["%s_index"%var]=index
 | |
|                 d["%s_first"%var]=index==0
 | |
|                 d["%s_even"%var]=index%2
 | |
|                 d["%s_odd"%var]=(index+1)%2
 | |
|                 d["%s_last"%var]=index+1==size
 | |
|                 if index%2:
 | |
|                     d["%s_parity"%var]='odd'
 | |
|                 else:
 | |
|                     d["%s_parity"%var]='even'
 | |
|                 if isinstance(i,types.DictType):
 | |
|                     d.update(i)
 | |
|                 else:
 | |
|                     d[var]=i
 | |
|                 ru.append(self.render_element(e,g_att,d))
 | |
|                 index+=1
 | |
|             return "".join(ru)
 | |
|         else:
 | |
|             return "qweb: t-foreach %s not found."%expr
 | |
|     def render_tag_if(self,e,t_att,g_att,v):
 | |
|         if self.eval_bool(t_att["if"],v):
 | |
|             return self.render_element(e,g_att,v)
 | |
|         else:
 | |
|             return ""
 | |
|     def render_tag_call(self,e,t_att,g_att,v):
 | |
|         # TODO t-prefix
 | |
|         if t_att.has_key("import"):
 | |
|             d=v
 | |
|         else:
 | |
|             d=v.copy()
 | |
|         d[0]=self.render_element(e,g_att,d)
 | |
|         return self.render(t_att["call"],d)
 | |
|     def render_tag_set(self,e,t_att,g_att,v):
 | |
|         if t_att.has_key("eval"):
 | |
|             v[t_att["set"]]=self.eval_object(t_att["eval"],v)
 | |
|         else:
 | |
|             v[t_att["set"]]=self.render_element(e,g_att,v)
 | |
|         return ""
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # QWeb HTML (+deprecated QWebFORM and QWebOLD)
 | |
| #----------------------------------------------------------
 | |
| class QWebURL:
 | |
|     """ URL helper
 | |
|     assert req.PATH_INFO== "/site/admin/page_edit"
 | |
|     u = QWebURL(root_path="/site/",req_path=req.PATH_INFO)
 | |
|     s=u.url2_href("user/login",{'a':'1'})
 | |
|     assert s=="../user/login?a=1"
 | |
|     
 | |
|     """
 | |
|     def __init__(self, root_path="/", req_path="/",defpath="",defparam={}):
 | |
|         self.defpath=defpath
 | |
|         self.defparam=defparam
 | |
|         self.root_path=root_path
 | |
|         self.req_path=req_path
 | |
|         self.req_list=req_path.split("/")[:-1]
 | |
|         self.req_len=len(self.req_list)
 | |
|     def decode(self,s):
 | |
|         h={}
 | |
|         for k,v in cgi.parse_qsl(s,1):
 | |
|             h[k]=v
 | |
|         return h
 | |
|     def encode(self,h):
 | |
|         return urllib.urlencode(h.items())
 | |
|     def request(self,req):
 | |
|         return req.REQUEST
 | |
|     def copy(self,path=None,param=None):
 | |
|         npath=self.defpath
 | |
|         if path:
 | |
|             npath=path
 | |
|         nparam=self.defparam.copy()
 | |
|         if param:
 | |
|             nparam.update(param)
 | |
|         return QWebURL(self.root_path,self.req_path,npath,nparam)
 | |
|     def path(self,path=''):
 | |
|         if not path:
 | |
|             path=self.defpath
 | |
|         pl=(self.root_path+path).split('/')
 | |
|         i=0
 | |
|         for i in range(min(len(pl), self.req_len)):
 | |
|             if pl[i]!=self.req_list[i]:
 | |
|                 break
 | |
|         else:
 | |
|             i+=1
 | |
|         dd=self.req_len-i
 | |
|         if dd<0:
 | |
|             dd=0
 | |
|         return '/'.join(['..']*dd+pl[i:])
 | |
|     def href(self,path='',arg={}):
 | |
|         p=self.path(path)
 | |
|         tmp=self.defparam.copy()
 | |
|         tmp.update(arg)
 | |
|         s=self.encode(tmp)
 | |
|         if len(s):
 | |
|             return p+"?"+s
 | |
|         else:
 | |
|             return p
 | |
|     def form(self,path='',arg={}):
 | |
|         p=self.path(path)
 | |
|         tmp=self.defparam.copy()
 | |
|         tmp.update(arg)
 | |
|         r=''.join(['<input type="hidden" name="%s" value="%s"/>'%(k,cgi.escape(str(v),1)) for k,v in tmp.items()])
 | |
|         return (p,r)
 | |
| class QWebField:
 | |
|     def __init__(self,name=None,default="",check=None):
 | |
|         self.name=name
 | |
|         self.default=default
 | |
|         self.check=check
 | |
|         # optional attributes
 | |
|         self.type=None
 | |
|         self.trim=1
 | |
|         self.required=1
 | |
|         self.cssvalid="form_valid"
 | |
|         self.cssinvalid="form_invalid"
 | |
|         # set by addfield
 | |
|         self.form=None
 | |
|         # set by processing
 | |
|         self.input=None
 | |
|         self.css=None
 | |
|         self.value=None
 | |
|         self.valid=None
 | |
|         self.invalid=None
 | |
|         self.validate(1)
 | |
|     def validate(self,val=1,update=1):
 | |
|         if val:
 | |
|             self.valid=1
 | |
|             self.invalid=0
 | |
|             self.css=self.cssvalid
 | |
|         else:
 | |
|             self.valid=0
 | |
|             self.invalid=1
 | |
|             self.css=self.cssinvalid
 | |
|         if update and self.form:
 | |
|             self.form.update()
 | |
|     def invalidate(self,update=1):
 | |
|         self.validate(0,update)
 | |
| class QWebForm:
 | |
|     class QWebFormF:
 | |
|         pass
 | |
|     def __init__(self,e=None,arg=None,default=None):
 | |
|         self.fields={}
 | |
|         # all fields have been submitted
 | |
|         self.submitted=False
 | |
|         self.missing=[]
 | |
|         # at least one field is invalid or missing
 | |
|         self.invalid=False
 | |
|         self.error=[]
 | |
|         # all fields have been submitted and are valid
 | |
|         self.valid=False
 | |
|         # fields under self.f for convenience
 | |
|         self.f=self.QWebFormF()
 | |
|         if e:
 | |
|             self.add_template(e)
 | |
|         # assume that the fields are done with the template
 | |
|         if default:
 | |
|             self.set_default(default,e==None)
 | |
|         if arg!=None:
 | |
|             self.process_input(arg)
 | |
|     def __getitem__(self,k):
 | |
|         return self.fields[k]
 | |
|     def set_default(self,default,add_missing=1):
 | |
|         for k,v in default.items():
 | |
|             if self.fields.has_key(k):
 | |
|                 self.fields[k].default=str(v)
 | |
|             elif add_missing:
 | |
|                 self.add_field(QWebField(k,v))
 | |
|     def add_field(self,f):
 | |
|         self.fields[f.name]=f
 | |
|         f.form=self
 | |
|         setattr(self.f,f.name,f)
 | |
|     def add_template(self,e):
 | |
|         att={}
 | |
|         for (an,av) in e.attributes.items():
 | |
|             an=str(an)
 | |
|             if an.startswith("t-"):
 | |
|                 att[an[2:]]=av.encode("utf8")
 | |
|         for i in ["form-text", "form-password", "form-radio", "form-checkbox", "form-select","form-textarea"]:
 | |
|             if att.has_key(i):
 | |
|                 name=att[i].split(".")[-1]
 | |
|                 default=att.get("default","")
 | |
|                 check=att.get("check",None)
 | |
|                 f=QWebField(name,default,check)
 | |
|                 if i=="form-textarea":
 | |
|                     f.type="textarea"
 | |
|                     f.trim=0
 | |
|                 if i=="form-checkbox":
 | |
|                     f.type="checkbox"
 | |
|                     f.required=0
 | |
|                 self.add_field(f)
 | |
|         for n in e.childNodes:
 | |
|             if n.nodeType==n.ELEMENT_NODE:
 | |
|                 self.add_template(n)
 | |
|     def process_input(self,arg):
 | |
|         for f in self.fields.values():
 | |
|             if arg.has_key(f.name):
 | |
|                 f.input=arg[f.name]
 | |
|                 f.value=f.input
 | |
|                 if f.trim:
 | |
|                     f.input=f.input.strip()
 | |
|                 f.validate(1,False)
 | |
|                 if f.check==None:
 | |
|                     continue
 | |
|                 elif callable(f.check):
 | |
|                     pass
 | |
|                 elif isinstance(f.check,str):
 | |
|                     v=f.check
 | |
|                     if f.check=="email":
 | |
|                         v=r"/^[^@#!& ]+@[A-Za-z0-9-][.A-Za-z0-9-]{0,64}\.[A-Za-z]{2,5}$/"
 | |
|                     if f.check=="date":
 | |
|                         v=r"/^(19|20)\d\d-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])$/"
 | |
|                     if not re.match(v[1:-1],f.input):
 | |
|                         f.validate(0,False)
 | |
|             else:
 | |
|                 f.value=f.default
 | |
|         self.update()
 | |
|     def validate_all(self,val=1):
 | |
|         for f in self.fields.values():
 | |
|             f.validate(val,0)
 | |
|         self.update()
 | |
|     def invalidate_all(self):
 | |
|         self.validate_all(0)
 | |
|     def update(self):
 | |
|         self.submitted=True
 | |
|         self.valid=True
 | |
|         self.errors=[]
 | |
|         for f in self.fields.values():
 | |
|             if f.required and f.input==None:
 | |
|                 self.submitted=False
 | |
|                 self.valid=False
 | |
|                 self.missing.append(f.name)
 | |
|             if f.invalid:
 | |
|                 self.valid=False
 | |
|                 self.error.append(f.name)
 | |
|         # invalid have been submitted and 
 | |
|         self.invalid=self.submitted and self.valid==False
 | |
|     def collect(self):
 | |
|         d={}
 | |
|         for f in self.fields.values():
 | |
|             d[f.name]=f.value
 | |
|         return d
 | |
| class QWebURLEval(QWebEval):
 | |
|     def __init__(self,data):
 | |
|         QWebEval.__init__(self,data)
 | |
|     def __getitem__(self,expr):
 | |
|         r=QWebEval.__getitem__(self,expr)
 | |
|         if isinstance(r,str):
 | |
|             return urllib.quote_plus(r)
 | |
|         else:
 | |
|             return r
 | |
| class QWebHtml(QWebXml):
 | |
|     """QWebHtml
 | |
|     QWebURL:
 | |
|     QWebField:
 | |
|     QWebForm:
 | |
|     QWebHtml:
 | |
|         an extended template engine, with a few utility class to easily produce
 | |
|         HTML, handle URLs and process forms, it adds the following magic attributes:
 | |
|     
 | |
|         t-href t-action t-form-text t-form-password t-form-textarea t-form-radio
 | |
|         t-form-checkbox t-form-select t-option t-selected t-checked t-pager
 | |
|     
 | |
|     # explication URL:
 | |
|     # v['tableurl']=QWebUrl({p=afdmin,saar=,orderby=,des=,mlink;meta_active=})
 | |
|     # t-href="tableurl?desc=1"
 | |
|     #
 | |
|     # explication FORM: t-if="form.valid()"
 | |
|     # Foreach i
 | |
|     #   email: <input type="text" t-esc-name="i" t-esc-value="form[i].value" t-esc-class="form[i].css"/>
 | |
|     #   <input type="radio" name="spamtype" t-esc-value="i" t-selected="i==form.f.spamtype.value"/>
 | |
|     #   <option t-esc-value="cc" t-selected="cc==form.f.country.value"><t t-esc="cname"></option>
 | |
|     # Simple forms:
 | |
|     #   <input t-form-text="form.email" t-check="email"/>
 | |
|     #   <input t-form-password="form.email" t-check="email"/>
 | |
|     #   <input t-form-radio="form.email" />
 | |
|     #   <input t-form-checkbox="form.email" />
 | |
|     #   <textarea t-form-textarea="form.email" t-check="email"/>
 | |
|     #   <select t-form-select="form.email"/>
 | |
|     #       <option t-value="1">
 | |
|     #   <input t-form-radio="form.spamtype" t-value="1"/> Cars
 | |
|     #   <input t-form-radio="form.spamtype" t-value="2"/> Sprt
 | |
|     """
 | |
|     # QWebForm from a template
 | |
|     def form(self,tname,arg=None,default=None):
 | |
|         form=QWebForm(self._t[tname],arg,default)
 | |
|         return form
 | |
| 
 | |
|     # HTML Att
 | |
|     def eval_url(self,av,v):
 | |
|         s=QWebURLEval(v).eval_format(av)
 | |
|         a=s.split('?',1)
 | |
|         arg={}
 | |
|         if len(a)>1:
 | |
|             for k,v in cgi.parse_qsl(a[1],1):
 | |
|                 arg[k]=v
 | |
|         b=a[0].split('/',1)
 | |
|         path=''
 | |
|         if len(b)>1:
 | |
|             path=b[1]
 | |
|         u=b[0]
 | |
|         return u,path,arg
 | |
|     def render_att_url_(self,e,an,av,v):
 | |
|         u,path,arg=self.eval_url(av,v)
 | |
|         if not isinstance(v.get(u,0),QWebURL):
 | |
|             out='qweb: missing url %r %r %r'%(u,path,arg)
 | |
|         else:
 | |
|             out=v[u].href(path,arg)
 | |
|         return ' %s="%s"'%(an[6:],cgi.escape(out,1))
 | |
|     def render_att_href(self,e,an,av,v):
 | |
|         return self.render_att_url_(e,"t-url-href",av,v)
 | |
|     def render_att_checked(self,e,an,av,v):
 | |
|         if self.eval_bool(av,v):
 | |
|             return ' %s="%s"'%(an[2:],an[2:])
 | |
|         else:
 | |
|             return ''
 | |
|     def render_att_selected(self,e,an,av,v):
 | |
|         return self.render_att_checked(e,an,av,v)
 | |
| 
 | |
|     # HTML Tags forms
 | |
|     def render_tag_rawurl(self,e,t_att,g_att,v):
 | |
|         u,path,arg=self.eval_url(t_att["rawurl"],v)
 | |
|         return v[u].href(path,arg)
 | |
|     def render_tag_escurl(self,e,t_att,g_att,v):
 | |
|         u,path,arg=self.eval_url(t_att["escurl"],v)
 | |
|         return cgi.escape(v[u].href(path,arg))
 | |
|     def render_tag_action(self,e,t_att,g_att,v):
 | |
|         u,path,arg=self.eval_url(t_att["action"],v)
 | |
|         if not isinstance(v.get(u,0),QWebURL):
 | |
|             action,input=('qweb: missing url %r %r %r'%(u,path,arg),'')
 | |
|         else:
 | |
|             action,input=v[u].form(path,arg)
 | |
|         g_att+=' action="%s"'%action
 | |
|         return self.render_element(e,g_att,v,input)
 | |
|     def render_tag_form_text(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(t_att["form-text"],v)
 | |
|         g_att+=' type="text" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
 | |
|         return self.render_element(e,g_att,v)
 | |
|     def render_tag_form_password(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(t_att["form-password"],v)
 | |
|         g_att+=' type="password" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
 | |
|         return self.render_element(e,g_att,v)
 | |
|     def render_tag_form_textarea(self,e,t_att,g_att,v):
 | |
|         type="textarea"
 | |
|         f=self.eval_object(t_att["form-textarea"],v)
 | |
|         g_att+=' name="%s" class="%s"'%(f.name,f.css)
 | |
|         r="<%s%s>%s</%s>"%(type,g_att,cgi.escape(f.value,1),type)
 | |
|         return r
 | |
|     def render_tag_form_radio(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(t_att["form-radio"],v)
 | |
|         val=t_att["value"]
 | |
|         g_att+=' type="radio" name="%s" value="%s"'%(f.name,val)
 | |
|         if f.value==val:
 | |
|             g_att+=' checked="checked"'
 | |
|         return self.render_element(e,g_att,v)
 | |
|     def render_tag_form_checkbox(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(t_att["form-checkbox"],v)
 | |
|         val=t_att["value"]
 | |
|         g_att+=' type="checkbox" name="%s" value="%s"'%(f.name,val)
 | |
|         if f.value==val:
 | |
|             g_att+=' checked="checked"'
 | |
|         return self.render_element(e,g_att,v)
 | |
|     def render_tag_form_select(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(t_att["form-select"],v)
 | |
|         g_att+=' name="%s" class="%s"'%(f.name,f.css)
 | |
|         return self.render_element(e,g_att,v)
 | |
|     def render_tag_option(self,e,t_att,g_att,v):
 | |
|         f=self.eval_object(e.parentNode.getAttribute("t-form-select"),v)
 | |
|         val=t_att["option"]
 | |
|         g_att+=' value="%s"'%(val)
 | |
|         if f.value==val:
 | |
|             g_att+=' selected="selected"'
 | |
|         return self.render_element(e,g_att,v)
 | |
| 
 | |
|     # HTML Tags others
 | |
|     def render_tag_pager(self,e,t_att,g_att,v):
 | |
|         pre=t_att["pager"]
 | |
|         total=int(self.eval_str(t_att["total"],v))
 | |
|         start=int(self.eval_str(t_att["start"],v))
 | |
|         step=int(self.eval_str(t_att.get("step","100"),v))
 | |
|         scope=int(self.eval_str(t_att.get("scope","5"),v))
 | |
|         # Compute Pager
 | |
|         p=pre+"_"
 | |
|         d={}
 | |
|         d[p+"tot_size"]=total
 | |
|         d[p+"tot_page"]=tot_page=total/step
 | |
|         d[p+"win_start0"]=total and start
 | |
|         d[p+"win_start1"]=total and start+1
 | |
|         d[p+"win_end0"]=max(0,min(start+step-1,total-1))
 | |
|         d[p+"win_end1"]=min(start+step,total)
 | |
|         d[p+"win_page0"]=win_page=start/step
 | |
|         d[p+"win_page1"]=win_page+1
 | |
|         d[p+"prev"]=(win_page!=0)
 | |
|         d[p+"prev_start"]=(win_page-1)*step
 | |
|         d[p+"next"]=(tot_page>=win_page+1)
 | |
|         d[p+"next_start"]=(win_page+1)*step
 | |
|         l=[]
 | |
|         begin=win_page-scope
 | |
|         end=win_page+scope
 | |
|         if begin<0:
 | |
|             end-=begin
 | |
|         if end>tot_page:
 | |
|             begin-=(end-tot_page)
 | |
|         i=max(0,begin)
 | |
|         while i<=min(end,tot_page) and total!=step:
 | |
|             l.append( { p+"page0":i, p+"page1":i+1, p+"start":i*step, p+"sel":(win_page==i) })
 | |
|             i+=1
 | |
|         d[p+"active"]=len(l)>1
 | |
|         d[p+"list"]=l
 | |
|         # Update v
 | |
|         v.update(d)
 | |
|         return ""
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # QWeb Simple Controller
 | |
| #----------------------------------------------------------
 | |
| def qweb_control(self,jump='main',p=[]):
 | |
|     """ qweb_control(self,jump='main',p=[]):
 | |
|     A simple function to handle the controler part of your application. It
 | |
|     dispatch the control to the jump argument, while ensuring that prefix
 | |
|     function have been called.
 | |
| 
 | |
|     qweb_control replace '/' to '_' and strip '_' from the jump argument.
 | |
| 
 | |
|     name1
 | |
|     name1_name2
 | |
|     name1_name2_name3
 | |
| 
 | |
|     """
 | |
|     jump=jump.replace('/','_').strip('_')
 | |
|     if not hasattr(self,jump):
 | |
|         return 0
 | |
|     done={}
 | |
|     todo=[]
 | |
|     while 1:
 | |
|         if jump!=None:
 | |
|             tmp=""
 | |
|             todo=[]
 | |
|             for i in jump.split("_"):
 | |
|                 tmp+=i+"_";
 | |
|                 if not done.has_key(tmp[:-1]):
 | |
|                     todo.append(tmp[:-1])
 | |
|             jump=None
 | |
|         elif len(todo):
 | |
|             i=todo.pop(0)
 | |
|             done[i]=1
 | |
|             if hasattr(self,i):
 | |
|                 f=getattr(self,i)
 | |
|                 r=f(*p)
 | |
|                 if isinstance(r,types.StringType):
 | |
|                     jump=r
 | |
|         else:
 | |
|             break
 | |
|     return 1
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # QWeb WSGI Request handler
 | |
| #----------------------------------------------------------
 | |
| class QWebSession(dict):
 | |
|     def __init__(self,environ,**kw):
 | |
|         dict.__init__(self)
 | |
|         default={
 | |
|             "path" : tempfile.gettempdir(),
 | |
|             "cookie_name" : "QWEBSID",
 | |
|             "cookie_lifetime" : 0,
 | |
|             "cookie_path" : '/',
 | |
|             "cookie_domain" : '',
 | |
|             "limit_cache" : 1,
 | |
|             "probability" : 0.01,
 | |
|             "maxlifetime" : 3600,
 | |
|             "disable" : 0,
 | |
|         }
 | |
|         for k,v in default.items():
 | |
|             setattr(self,'session_%s'%k,kw.get(k,v))
 | |
|         # Try to find session
 | |
|         self.session_found_cookie=0
 | |
|         self.session_found_url=0
 | |
|         self.session_found=0
 | |
|         self.session_orig=""
 | |
|         # Try cookie
 | |
|         c=Cookie.SimpleCookie()
 | |
|         c.load(environ.get('HTTP_COOKIE', ''))
 | |
|         if c.has_key(self.session_cookie_name):
 | |
|             sid=c[self.session_cookie_name].value[:64]
 | |
|             if re.match('[a-f0-9]+$',sid) and self.session_load(sid):
 | |
|                 self.session_id=sid
 | |
|                 self.session_found_cookie=1
 | |
|                 self.session_found=1
 | |
|         # Try URL
 | |
|         if not self.session_found_cookie:
 | |
|             mo=re.search('&%s=([a-f0-9]+)'%self.session_cookie_name,environ.get('QUERY_STRING',''))
 | |
|             if mo and self.session_load(mo.group(1)):
 | |
|                 self.session_id=mo.group(1)
 | |
|                 self.session_found_url=1
 | |
|                 self.session_found=1
 | |
|         # New session
 | |
|         if not self.session_found:
 | |
|             self.session_id='%032x'%random.randint(1,2**128)
 | |
|         self.session_trans_sid="&%s=%s"%(self.session_cookie_name,self.session_id)
 | |
|         # Clean old session
 | |
|         if random.random() < self.session_probability:
 | |
|             self.session_clean()
 | |
|     def session_get_headers(self):
 | |
|         h=[]
 | |
|         if (not self.session_disable) and (len(self) or len(self.session_orig)):
 | |
|             self.session_save()
 | |
|             if not self.session_found_cookie:
 | |
|                 c=Cookie.SimpleCookie()
 | |
|                 c[self.session_cookie_name] = self.session_id
 | |
|                 c[self.session_cookie_name]['path'] = self.session_cookie_path
 | |
|                 if self.session_cookie_domain:
 | |
|                     c[self.session_cookie_name]['domain'] = self.session_cookie_domain
 | |
| #               if self.session_cookie_lifetime:
 | |
| #                   c[self.session_cookie_name]['expires'] = TODO date localtime or not, datetime.datetime(1970, 1, 1)
 | |
|                 h.append(("Set-Cookie", c[self.session_cookie_name].OutputString()))
 | |
|             if self.session_limit_cache:
 | |
|                 h.append(('Cache-Control','no-store, no-cache, must-revalidate, post-check=0, pre-check=0'))
 | |
|                 h.append(('Expires','Thu, 19 Nov 1981 08:52:00 GMT'))
 | |
|                 h.append(('Pragma','no-cache'))
 | |
|         return h
 | |
|     def session_load(self,sid):
 | |
|         fname=os.path.join(self.session_path,'qweb_sess_%s'%sid)
 | |
|         try:
 | |
|             orig=file(fname).read()
 | |
|             d=pickle.loads(orig)
 | |
|         except:
 | |
|             return
 | |
|         self.session_orig=orig
 | |
|         self.update(d)
 | |
|         return 1
 | |
|     def session_save(self):
 | |
|         if not os.path.isdir(self.session_path):
 | |
|             os.makedirs(self.session_path)
 | |
|         fname=os.path.join(self.session_path,'qweb_sess_%s'%self.session_id)
 | |
|         try:
 | |
|             oldtime=os.path.getmtime(fname)
 | |
|         except OSError,IOError:
 | |
|             oldtime=0
 | |
|         dump=pickle.dumps(self.copy())
 | |
|         if (dump != self.session_orig) or (time.time() > oldtime+self.session_maxlifetime/4):
 | |
|             tmpname=os.path.join(self.session_path,'qweb_sess_%s_%x'%(self.session_id,random.randint(1,2**32)))
 | |
|             f=file(tmpname,'wb')
 | |
|             f.write(dump)
 | |
|             f.close()
 | |
|             if sys.platform=='win32' and os.path.isfile(fname):
 | |
|                 os.remove(fname)
 | |
|             os.rename(tmpname,fname)
 | |
|     def session_clean(self):
 | |
|         t=time.time()
 | |
|         try:
 | |
|             for i in [os.path.join(self.session_path,i) for i in os.listdir(self.session_path) if i.startswith('qweb_sess_')]:
 | |
|                 if (t > os.path.getmtime(i)+self.session_maxlifetime):
 | |
|                     os.unlink(i)
 | |
|         except OSError,IOError:
 | |
|             pass
 | |
| class QWebSessionMem(QWebSession):
 | |
|     def session_load(self,sid):
 | |
|         global _qweb_sessions
 | |
|         if not "_qweb_sessions" in globals():
 | |
|             _qweb_sessions={}
 | |
|         if _qweb_sessions.has_key(sid):
 | |
|             self.session_orig=_qweb_sessions[sid]
 | |
|             self.update(self.session_orig)
 | |
|             return 1
 | |
|     def session_save(self):
 | |
|         global _qweb_sessions
 | |
|         if not "_qweb_sessions" in globals():
 | |
|             _qweb_sessions={}
 | |
|         _qweb_sessions[self.session_id]=self.copy()
 | |
| class QWebSessionService:
 | |
|     def __init__(self, wsgiapp, url_rewrite=0):
 | |
|         self.wsgiapp=wsgiapp
 | |
|         self.url_rewrite_tags="a=href,area=href,frame=src,form=,fieldset="
 | |
|     def __call__(self, environ, start_response):
 | |
|         # TODO
 | |
|         # use QWebSession to provide environ["qweb.session"]
 | |
|         return self.wsgiapp(environ,start_response)
 | |
| class QWebDict(dict):
 | |
|     def __init__(self,*p):
 | |
|         dict.__init__(self,*p)
 | |
|     def __getitem__(self,key):
 | |
|         return self.get(key,"")
 | |
|     def int(self,key):
 | |
|         try:
 | |
|             return int(self.get(key,"0"))
 | |
|         except ValueError:
 | |
|             return 0
 | |
| class QWebListDict(dict):
 | |
|     def __init__(self,*p):
 | |
|         dict.__init__(self,*p)
 | |
|     def __getitem__(self,key):
 | |
|         return self.get(key,[])
 | |
|     def appendlist(self,key,val):
 | |
|         if self.has_key(key):
 | |
|             self[key].append(val)
 | |
|         else:
 | |
|             self[key]=[val]
 | |
|     def get_qwebdict(self):
 | |
|         d=QWebDict()
 | |
|         for k,v in self.items():
 | |
|             d[k]=v[-1]
 | |
|         return d
 | |
| class QWebRequest:
 | |
|     """QWebRequest a WSGI request handler.
 | |
| 
 | |
|     QWebRequest is a WSGI request handler that feature GET, POST and POST
 | |
|     multipart methods, handles cookies and headers and provide a dict-like
 | |
|     SESSION Object (either on the filesystem or in memory).
 | |
| 
 | |
|     It is constructed with the environ and start_response WSGI arguments:
 | |
|     
 | |
|       req=qweb.QWebRequest(environ, start_response)
 | |
|     
 | |
|     req has the folowing attributes :
 | |
|     
 | |
|       req.environ standard WSGI dict (CGI and wsgi ones)
 | |
|     
 | |
|     Some CGI vars as attributes from environ for convenience: 
 | |
|     
 | |
|       req.SCRIPT_NAME
 | |
|       req.PATH_INFO
 | |
|       req.REQUEST_URI
 | |
|     
 | |
|     Some computed value (also for convenience)
 | |
|     
 | |
|       req.FULL_URL full URL recontructed (http://host/query)
 | |
|       req.FULL_PATH (URL path before ?querystring)
 | |
|     
 | |
|     Dict constructed from querystring and POST datas, PHP-like.
 | |
|     
 | |
|       req.GET contains GET vars
 | |
|       req.POST contains POST vars
 | |
|       req.REQUEST contains merge of GET and POST
 | |
|       req.FILES contains uploaded files
 | |
|       req.GET_LIST req.POST_LIST req.REQUEST_LIST req.FILES_LIST multiple arguments versions
 | |
|       req.debug() returns an HTML dump of those vars
 | |
|     
 | |
|     A dict-like session object.
 | |
|     
 | |
|       req.SESSION the session start when the dict is not empty.
 | |
|     
 | |
|     Attribute for handling the response
 | |
|     
 | |
|       req.response_headers dict-like to set headers
 | |
|       req.response_cookies a SimpleCookie to set cookies
 | |
|       req.response_status a string to set the status like '200 OK'
 | |
|     
 | |
|       req.write() to write to the buffer
 | |
|     
 | |
|     req itselfs is an iterable object with the buffer, it will also also call
 | |
|     start_response automatically before returning anything via the iterator.
 | |
|     
 | |
|     To make it short, it means that you may use
 | |
|     
 | |
|       return req
 | |
|     
 | |
|     at the end of your request handling to return the reponse to any WSGI
 | |
|     application server.
 | |
|     """
 | |
|     #
 | |
|     # This class contains part ripped from colubrid (with the permission of
 | |
|     # mitsuhiko) see http://wsgiarea.pocoo.org/colubrid/
 | |
|     #
 | |
|     # - the class HttpHeaders
 | |
|     # - the method load_post_data (tuned version)
 | |
|     #
 | |
|     class HttpHeaders(object):
 | |
|         def __init__(self):
 | |
|             self.data = [('Content-Type', 'text/html')]
 | |
|         def __setitem__(self, key, value):
 | |
|             self.set(key, value)
 | |
|         def __delitem__(self, key):
 | |
|             self.remove(key)
 | |
|         def __contains__(self, key):
 | |
|             key = key.lower()
 | |
|             for k, v in self.data:
 | |
|                 if k.lower() == key:
 | |
|                     return True
 | |
|             return False
 | |
|         def add(self, key, value):
 | |
|             self.data.append((key, value))
 | |
|         def remove(self, key, count=-1):
 | |
|             removed = 0
 | |
|             data = []
 | |
|             for _key, _value in self.data:
 | |
|                 if _key.lower() != key.lower():
 | |
|                     if count > -1:
 | |
|                         if removed >= count:
 | |
|                             break
 | |
|                         else:
 | |
|                             removed += 1
 | |
|                     data.append((_key, _value))
 | |
|             self.data = data
 | |
|         def clear(self):
 | |
|             self.data = []
 | |
|         def set(self, key, value):
 | |
|             self.remove(key)
 | |
|             self.add(key, value)
 | |
|         def get(self, key=False, httpformat=False):
 | |
|             if not key:
 | |
|                 result = self.data
 | |
|             else:
 | |
|                 result = []
 | |
|                 for _key, _value in self.data:
 | |
|                     if _key.lower() == key.lower():
 | |
|                         result.append((_key, _value))
 | |
|             if httpformat:
 | |
|                 return '\n'.join(['%s: %s' % item for item in result])
 | |
|             return result
 | |
|     def load_post_data(self,environ,POST,FILES):
 | |
|         length = int(environ['CONTENT_LENGTH'])
 | |
|         DATA = environ['wsgi.input'].read(length)
 | |
|         if environ.get('CONTENT_TYPE', '').startswith('multipart'):
 | |
|             lines = ['Content-Type: %s' % environ.get('CONTENT_TYPE', '')]
 | |
|             for key, value in environ.items():
 | |
|                 if key.startswith('HTTP_'):
 | |
|                     lines.append('%s: %s' % (key, value))
 | |
|             raw = '\r\n'.join(lines) + '\r\n\r\n' + DATA
 | |
|             msg = email.message_from_string(raw)
 | |
|             for sub in msg.get_payload():
 | |
|                 if not isinstance(sub, email.Message.Message):
 | |
|                     continue
 | |
|                 name_dict = cgi.parse_header(sub['Content-Disposition'])[1]
 | |
|                 if 'filename' in name_dict:
 | |
|                     # Nested MIME Messages are not supported'
 | |
|                     if type([]) == type(sub.get_payload()):
 | |
|                         continue
 | |
|                     if not name_dict['filename'].strip():
 | |
|                         continue
 | |
|                     filename = name_dict['filename']
 | |
|                     # why not keep all the filename? because IE always send 'C:\documents and settings\blub\blub.png'
 | |
|                     filename = filename[filename.rfind('\\') + 1:]
 | |
|                     if 'Content-Type' in sub:
 | |
|                         content_type = sub['Content-Type']
 | |
|                     else:
 | |
|                         content_type = None
 | |
|                     s = { "name":filename, "type":content_type, "data":sub.get_payload() }
 | |
|                     FILES.appendlist(name_dict['name'], s)
 | |
|                 else:
 | |
|                     POST.appendlist(name_dict['name'], sub.get_payload())
 | |
|         else:
 | |
|             POST.update(cgi.parse_qs(DATA,keep_blank_values=1))
 | |
|         return DATA
 | |
| 
 | |
|     def __init__(self,environ,start_response,session=QWebSession):
 | |
|         self.environ=environ
 | |
|         self.start_response=start_response
 | |
|         self.buffer=[]
 | |
| 
 | |
|         self.SCRIPT_NAME = environ.get('SCRIPT_NAME', '')
 | |
|         self.PATH_INFO = environ.get('PATH_INFO', '')
 | |
|         # extensions:
 | |
|         self.FULL_URL = environ['FULL_URL'] = self.get_full_url(environ)
 | |
|         # REQUEST_URI is optional, fake it if absent
 | |
|         if not environ.has_key("REQUEST_URI"):
 | |
|             environ["REQUEST_URI"]=urllib.quote(self.SCRIPT_NAME+self.PATH_INFO)
 | |
|             if environ.get('QUERY_STRING'):
 | |
|                 environ["REQUEST_URI"]+='?'+environ['QUERY_STRING']
 | |
|         self.REQUEST_URI = environ["REQUEST_URI"]
 | |
|         # full quote url path before the ?
 | |
|         self.FULL_PATH = environ['FULL_PATH'] = self.REQUEST_URI.split('?')[0]
 | |
| 
 | |
|         self.request_cookies=Cookie.SimpleCookie()
 | |
|         self.request_cookies.load(environ.get('HTTP_COOKIE', ''))
 | |
| 
 | |
|         self.response_started=False
 | |
|         self.response_gzencode=False
 | |
|         self.response_cookies=Cookie.SimpleCookie()
 | |
|         # to delete a cookie use: c[key]['expires'] = datetime.datetime(1970, 1, 1)
 | |
|         self.response_headers=self.HttpHeaders()
 | |
|         self.response_status="200 OK"
 | |
| 
 | |
|         self.php=None
 | |
|         if self.environ.has_key("php"):
 | |
|             self.php=environ["php"]
 | |
|             self.SESSION=self.php._SESSION
 | |
|             self.GET=self.php._GET
 | |
|             self.POST=self.php._POST
 | |
|             self.REQUEST=self.php._ARG
 | |
|             self.FILES=self.php._FILES
 | |
|         else:
 | |
|             if isinstance(session,QWebSession):
 | |
|                 self.SESSION=session
 | |
|             elif session:
 | |
|                 self.SESSION=session(environ)
 | |
|             else:
 | |
|                 self.SESSION=None
 | |
|             self.GET_LIST=QWebListDict(cgi.parse_qs(environ.get('QUERY_STRING', ''),keep_blank_values=1))
 | |
|             self.POST_LIST=QWebListDict()
 | |
|             self.FILES_LIST=QWebListDict()
 | |
|             self.REQUEST_LIST=QWebListDict(self.GET_LIST)
 | |
|             if environ['REQUEST_METHOD'] == 'POST':
 | |
|                 self.DATA=self.load_post_data(environ,self.POST_LIST,self.FILES_LIST)
 | |
|                 self.REQUEST_LIST.update(self.POST_LIST)
 | |
|             self.GET=self.GET_LIST.get_qwebdict()
 | |
|             self.POST=self.POST_LIST.get_qwebdict()
 | |
|             self.FILES=self.FILES_LIST.get_qwebdict()
 | |
|             self.REQUEST=self.REQUEST_LIST.get_qwebdict()
 | |
|     def get_full_url(environ):
 | |
|         # taken from PEP 333
 | |
|         if 'FULL_URL' in environ:
 | |
|             return environ['FULL_URL']
 | |
|         url = environ['wsgi.url_scheme']+'://'
 | |
|         if environ.get('HTTP_HOST'):
 | |
|             url += environ['HTTP_HOST']
 | |
|         else:
 | |
|             url += environ['SERVER_NAME']
 | |
|             if environ['wsgi.url_scheme'] == 'https':
 | |
|                 if environ['SERVER_PORT'] != '443':
 | |
|                     url += ':' + environ['SERVER_PORT']
 | |
|             else:
 | |
|                 if environ['SERVER_PORT'] != '80':
 | |
|                     url += ':' + environ['SERVER_PORT']
 | |
|         if environ.has_key('REQUEST_URI'):
 | |
|             url += environ['REQUEST_URI']
 | |
|         else:
 | |
|             url += urllib.quote(environ.get('SCRIPT_NAME', ''))
 | |
|             url += urllib.quote(environ.get('PATH_INFO', ''))
 | |
|             if environ.get('QUERY_STRING'):
 | |
|                 url += '?' + environ['QUERY_STRING']
 | |
|         return url
 | |
|     get_full_url=staticmethod(get_full_url)
 | |
|     def save_files(self):
 | |
|         for k,v in self.FILES.items():
 | |
|             if not v.has_key("tmp_file"):
 | |
|                 f=tempfile.NamedTemporaryFile()
 | |
|                 f.write(v["data"])
 | |
|                 f.flush()
 | |
|                 v["tmp_file"]=f
 | |
|                 v["tmp_name"]=f.name
 | |
|     def debug(self):
 | |
|         body=''
 | |
|         for name,d in [
 | |
|             ("GET",self.GET), ("POST",self.POST), ("REQUEST",self.REQUEST), ("FILES",self.FILES),
 | |
|             ("GET_LIST",self.GET_LIST), ("POST_LIST",self.POST_LIST), ("REQUEST_LIST",self.REQUEST_LIST), ("FILES_LIST",self.FILES_LIST),
 | |
|             ("SESSION",self.SESSION), ("environ",self.environ),
 | |
|         ]:
 | |
|             body+='<table border="1" width="100%" align="center">\n'
 | |
|             body+='<tr><th colspan="2" align="center">%s</th></tr>\n'%name
 | |
|             keys=d.keys()
 | |
|             keys.sort()
 | |
|             body+=''.join(['<tr><td>%s</td><td>%s</td></tr>\n'%(k,cgi.escape(repr(d[k]))) for k in keys])
 | |
|             body+='</table><br><br>\n\n'
 | |
|         return body
 | |
|     def write(self,s):
 | |
|         self.buffer.append(s)
 | |
|     def echo(self,*s):
 | |
|         self.buffer.extend([str(i) for i in s])
 | |
|     def response(self):
 | |
|         if not self.response_started:
 | |
|             if not self.php:
 | |
|                 for k,v in self.FILES.items():
 | |
|                     if v.has_key("tmp_file"):
 | |
|                         try:
 | |
|                             v["tmp_file"].close()
 | |
|                         except OSError:
 | |
|                             pass
 | |
|                 if self.response_gzencode and self.environ.get('HTTP_ACCEPT_ENCODING','').find('gzip')!=-1:
 | |
|                     zbuf=StringIO.StringIO()
 | |
|                     zfile=gzip.GzipFile(mode='wb', fileobj=zbuf)
 | |
|                     zfile.write(''.join(self.buffer))
 | |
|                     zfile.close()
 | |
|                     zbuf=zbuf.getvalue()
 | |
|                     self.buffer=[zbuf]
 | |
|                     self.response_headers['Content-Encoding']="gzip"
 | |
|                     self.response_headers['Content-Length']=str(len(zbuf))
 | |
|                 headers = self.response_headers.get()
 | |
|                 if isinstance(self.SESSION, QWebSession):
 | |
|                     headers.extend(self.SESSION.session_get_headers())
 | |
|                 headers.extend([('Set-Cookie', self.response_cookies[i].OutputString()) for i in self.response_cookies])
 | |
|                 self.start_response(self.response_status, headers)
 | |
|             self.response_started=True
 | |
|         return self.buffer
 | |
|     def __iter__(self):
 | |
|         return self.response().__iter__()
 | |
|     def http_redirect(self,url,permanent=1):
 | |
|         if permanent:
 | |
|             self.response_status="301 Moved Permanently"
 | |
|         else:
 | |
|             self.response_status="302 Found"
 | |
|         self.response_headers["Location"]=url
 | |
|     def http_404(self,msg="<h1>404 Not Found</h1>"):
 | |
|         self.response_status="404 Not Found"
 | |
|         if msg:
 | |
|             self.write(msg)
 | |
|     def http_download(self,fname,fstr,partial=0):
 | |
| #       allow fstr to be a file-like object
 | |
| #       if parital:
 | |
| #           say accept ranages
 | |
| #           parse range headers...
 | |
| #           if range:
 | |
| #               header("HTTP/1.1 206 Partial Content");
 | |
| #               header("Content-Range: bytes $offset-".($fsize-1)."/".$fsize);
 | |
| #               header("Content-Length: ".($fsize-$offset));
 | |
| #               fseek($fd,$offset);
 | |
| #           else:
 | |
|         self.response_headers["Content-Type"]="application/octet-stream"
 | |
|         self.response_headers["Content-Disposition"]="attachment; filename=\"%s\""%fname
 | |
|         self.response_headers["Content-Transfer-Encoding"]="binary"
 | |
|         self.response_headers["Content-Length"]="%d"%len(fstr)
 | |
|         self.write(fstr)
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # QWeb WSGI HTTP Server to run any WSGI app
 | |
| # autorun, run an app as FCGI or CGI otherwise launch the server
 | |
| #----------------------------------------------------------
 | |
| class QWebWSGIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
 | |
|     def log_message(self,*p):
 | |
|         if self.server.log:
 | |
|             return BaseHTTPServer.BaseHTTPRequestHandler.log_message(self,*p)
 | |
|     def address_string(self):
 | |
|         return self.client_address[0]
 | |
|     def start_response(self,status,headers):
 | |
|         l=status.split(' ',1)
 | |
|         self.send_response(int(l[0]),l[1])
 | |
|         ctype_sent=0
 | |
|         for i in headers:
 | |
|             if i[0].lower()=="content-type":
 | |
|                 ctype_sent=1
 | |
|             self.send_header(*i)
 | |
|         if not ctype_sent:
 | |
|             self.send_header("Content-type", "text/html")
 | |
|         self.end_headers()
 | |
|         return self.write
 | |
|     def write(self,data):
 | |
|         try:
 | |
|             self.wfile.write(data)
 | |
|         except (socket.error, socket.timeout),e:
 | |
|             print e
 | |
|     def bufferon(self):
 | |
|         if not getattr(self,'wfile_buf',0):
 | |
|             self.wfile_buf=1
 | |
|             self.wfile_bak=self.wfile
 | |
|             self.wfile=StringIO.StringIO()
 | |
|     def bufferoff(self):
 | |
|         if self.wfile_buf:
 | |
|             buf=self.wfile
 | |
|             self.wfile=self.wfile_bak
 | |
|             self.write(buf.getvalue())
 | |
|             self.wfile_buf=0
 | |
|     def serve(self,type):
 | |
|         path_info, parameters, query = urlparse.urlparse(self.path)[2:5]
 | |
|         environ = {
 | |
|             'wsgi.version':         (1,0),
 | |
|             'wsgi.url_scheme':      'http',
 | |
|             'wsgi.input':           self.rfile,
 | |
|             'wsgi.errors':          sys.stderr,
 | |
|             'wsgi.multithread':     0,
 | |
|             'wsgi.multiprocess':    0,
 | |
|             'wsgi.run_once':        0,
 | |
|             'REQUEST_METHOD':       self.command,
 | |
|             'SCRIPT_NAME':          '',
 | |
|             'QUERY_STRING':         query,
 | |
|             'CONTENT_TYPE':         self.headers.get('Content-Type', ''),
 | |
|             'CONTENT_LENGTH':       self.headers.get('Content-Length', ''),
 | |
|             'REMOTE_ADDR':          self.client_address[0],
 | |
|             'REMOTE_PORT':          str(self.client_address[1]),
 | |
|             'SERVER_NAME':          self.server.server_address[0],
 | |
|             'SERVER_PORT':          str(self.server.server_address[1]),
 | |
|             'SERVER_PROTOCOL':      self.request_version,
 | |
|             # extention
 | |
|             'FULL_PATH':            self.path,
 | |
|             'qweb.mode':            'standalone',
 | |
|         }
 | |
|         if path_info:
 | |
|             environ['PATH_INFO'] = urllib.unquote(path_info)
 | |
|         for key, value in self.headers.items():
 | |
|             environ['HTTP_' + key.upper().replace('-', '_')] = value
 | |
|         # Hack to avoid may TCP packets
 | |
|         self.bufferon()
 | |
|         appiter=self.server.wsgiapp(environ, self.start_response)
 | |
|         for data in appiter:
 | |
|             self.write(data)
 | |
|             self.bufferoff()
 | |
|         self.bufferoff()
 | |
|     def do_GET(self):
 | |
|         self.serve('GET')
 | |
|     def do_POST(self):
 | |
|         self.serve('GET')
 | |
| class QWebWSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
 | |
|     """ QWebWSGIServer
 | |
|         qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1)
 | |
|         A WSGI HTTP server threaded or not and a function to automatically run your
 | |
|         app according to the environement (either standalone, CGI or FastCGI).
 | |
| 
 | |
|         This feature is called QWeb autorun. If you want to  To use it on your
 | |
|         application use the following lines at the end of the main application
 | |
|         python file:
 | |
| 
 | |
|         if __name__ == '__main__':
 | |
|             qweb.qweb_wsgi_autorun(your_wsgi_app)
 | |
| 
 | |
|         this function will select the approriate running mode according to the
 | |
|         calling environement (http-server, FastCGI or CGI).
 | |
|     """
 | |
|     def __init__(self, wsgiapp, ip, port, threaded=1, log=1):
 | |
|         BaseHTTPServer.HTTPServer.__init__(self, (ip, port), QWebWSGIHandler)
 | |
|         self.wsgiapp = wsgiapp
 | |
|         self.threaded = threaded
 | |
|         self.log = log
 | |
|     def process_request(self,*p):
 | |
|         if self.threaded:
 | |
|             return SocketServer.ThreadingMixIn.process_request(self,*p)
 | |
|         else:
 | |
|             return BaseHTTPServer.HTTPServer.process_request(self,*p)
 | |
| def qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1,log=1,callback_ready=None):
 | |
|     if sys.platform=='win32':
 | |
|         fcgi=0
 | |
|     else:
 | |
|         fcgi=1
 | |
|         sock = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM)
 | |
|         try:
 | |
|             sock.getpeername()
 | |
|         except socket.error, e:
 | |
|             if e[0] == errno.ENOTSOCK:
 | |
|                 fcgi=0
 | |
|     if fcgi or os.environ.has_key('REQUEST_METHOD'):
 | |
|         import fcgi
 | |
|         fcgi.WSGIServer(wsgiapp,multithreaded=False).run()
 | |
|     else:
 | |
|         if log:
 | |
|             print 'Serving on %s:%d'%(ip,port)
 | |
|         s=QWebWSGIServer(wsgiapp,ip=ip,port=port,threaded=threaded,log=log)
 | |
|         if callback_ready:
 | |
|             callback_ready()
 | |
|         try:
 | |
|             s.serve_forever()
 | |
|         except KeyboardInterrupt,e:
 | |
|             sys.excepthook(*sys.exc_info())
 | |
| 
 | |
| #----------------------------------------------------------
 | |
| # Qweb Documentation
 | |
| #----------------------------------------------------------
 | |
| def qweb_doc():
 | |
|     body=__doc__
 | |
|     for i in [QWebXml ,QWebHtml ,QWebForm ,QWebURL ,qweb_control ,QWebRequest ,QWebSession ,QWebWSGIServer ,qweb_wsgi_autorun]:
 | |
|         n=i.__name__
 | |
|         d=i.__doc__
 | |
|         body+='\n\n%s\n%s\n\n%s'%(n,'-'*len(n),d)
 | |
|     return body
 | |
| 
 | |
|     print qweb_doc()
 | |
| 
 | |
| #
 |