#! /usr/bin/python #class synchronizer: import thread, threading import string,re import sys,readline,os from popen2 import popen2 import urllib class AppURLopener(urllib.FancyURLopener): def __init__(self, *args): self.version = "Mozilla" urllib.FancyURLopener.__init__(self, *args) urllib._urlopener = AppURLopener() class Type: """ A type is a leaf in a tree. For ex: """ def __init__(self, typelist): if type(typelist)==type(""): t=typelist[1:-1] if len(t): self.typelist=t.split(":") else: self.typelist=[] elif isinstance(typelist,Type): self.typelist=typelist.typelist else: self.typelist=typelist def __str__(self): return "<%s>" % string.join(self.typelist,":") def __repr__(self): return "<%s>" % string.join(self.typelist,":") def __eq__(self, other): return self.typelist == other.typelist def __ne__(self, other): return self.typelist != other.typelist def __lt__(self, other): if len(self.typelist) >= len(other.typelist): return 0 return self.typelist == other.typelist[:len(self.typelist)] def __le__(self, other): if len(self.typelist) > len(other.typelist): return 0 return self.typelist == other.typelist[:len(self.typelist)] def __gt__(self, other): if len(self.typelist) <= len(other.typelist): return 0 return self.typelist[:len(other.typelist)] == other.typelist def __ge__(self, other): if len(self.typelist) < len(other.typelist): return 0 return self.typelist[:len(other.typelist)] == other.typelist def __hash__(self): return hash(self.__str__()) class MatchingPolicy: """ This is what says what is good to search or what can be given as an answer""" def __init__(self): pass def can_answer(self, qtype, intype): return 0 def has_answered(self, atype, outtype): return 0 class MatchInherit(MatchingPolicy): def can_answer(self, qtype, intype): return intype <= qtype def has_answered(self, atype, outtype): return atype <= outtype class MatchExact(MatchingPolicy): def can_answer(self, qtype, intype): return intype == qtype def has_answered(self, atype, outtype): return atype == outtype class Browser: """ This call a browser """ command='browser "%s"' def __init__(self): pass def openurl(self, url): print "exec[%s]" % (self.command % url) os.system(self.command % url) class GaleonTab(Browser): command='galeon -n "%s"' ## ## Info ## class Info: """ This is a question or an answer """ DISP="."*32+reduce(lambda x,y: x+chr(y), range(32,128),"")+"."*128 def __init__(self, type, value, relevance, related="", history=None, url="", browser=GaleonTab()): self.type=Type(type) self.value=value self.relevance=relevance self.url=url self.related=related if history == None: self.history=[] else: self.history=history self.browser=browser def browse(self): self.browser.openurl(self.url) def __str__(self): if self.url: u='U' else: u='u' if len(self.history) < 10: h=str(len(self.history)) else: h='+' return "[%3i|%c%c] %.15s %.15s's %.70s" % (self.relevance, u, h, str(self.type),self.related, self.value.translate(self.DISP)) def dump(self): return "---[begining of dump]---\n%s\nTYPE=%s\nRELEVANCE=%i\nURL=%s\nRELATED=%s\nHISTORY=%s\n---[end of dump]---" % (self.value, str(self.type), self.relevance, self.url, self.related, str(self.history)) ## ## SearchLets ## class SearchLet: """ This has the ability to search for an answer """ def __init__(self, name, intype, outtype, policy=MatchInherit()): self.name=name self.intype=Type(intype) self.outtype=Type(outtype) self.policy=policy def answer(self, question): if not self.can_answer(question): return [] else: return self._answer(question) def _answer(self, question): return [] def help(self,text): return ["No help available"] def command(self,text): return ["Command not implemented"] def can_answer(self, info): return self.policy.can_answer(info.type, self.intype) def has_answered(self, info): return self.policy.has_answered(info.type, self.outtype) class ParseFileLet(SearchLet): """ Everything that comes from a file object and needs to be parsed with a regexp """ DISP=" "*32+reduce(lambda x,y: x+chr(y), range(32,128),"")+" "*128 def __init__(self, name, intype, outtype, source, regexp="", format="", atn=-20,relatn=-5,policy=MatchInherit()): SearchLet.__init__(self, name, intype, outtype, policy) self.source=source if regexp: self.r=re.compile(regexp) else: self.r=None self.format=format self.atn=atn self.relatn=relatn def getfile(self, source, question): return None def get_parsed_source(self, question): f=self.getfile(self.source,question.value) if not f: return [] a=[] if self.r: for l in f.readlines(): m=self.r.search(l) if m: if self.format: a.append(m.expand(self.format)) else: a.append(l) else: a=f.readlines() f.close() return a def _answer(self, question): ans=[] p=self.get_parsed_source(question) score=question.relevance+self.atn for a in p: ans.append(Info(type=self.outtype, value=a, relevance=score, related=question.value, history=question.history+[question.value], url=a)) score += self.relatn return ans class WebLet(ParseFileLet): def getfile(self, source, question): print "connect to [%.70s]" % source f=urllib.urlopen(source) return f class WebGetLet(ParseFileLet): def getfile(self, source, question): r=source % urllib.quote_plus(question.translate(self.DISP)) print "connect to [%s]" % r f=urllib.urlopen(r) return f class WebPostLet(WebLet): def __init__(self, name, intype, outtype, source, queryvar, regexp="", format="", atn=-20, relatn=-5, postdata=[], policy=MatchInherit()): WebLet.__init__(self, name, intype, outtype, source, regexp, format, atn, relatn, policy) self.queryvar=queryvar self.postdata=postdata def getfile(self, source, question): post=urllib.urlencode(self.postdata+[(self.queryvar,question)]) print "connect to [%.70s]" % source f=urllib.urlopen(source,post) return f class SystemCommandLet(ParseFileLet): def getfile(self, request, question): com = request % question print "exec [%.70s]" % com outf, inf= popen2(com) return outf def _answer(self, question): p=self.get_parsed_source(question) score=question.relevance+self.atn if question.type==Type([NET, URL]): u=question.value else: u="" return [Info(type=self.outtype, value=string.join(p), relevance=score, related=question.value, history=question.history+[question.value], url=u)] ## ## Engine ## class Commands: def __init__(self): self.commands=filter(lambda x: type(x)==type("") and x[0]!='_', self.__class__.__dict__.values()) ASK="ask" BROWSE="browse" EXPLORE="explore" class DispatchEngine: """ This will get questions, ask them to searchlets and give answers""" def __init__(self, commands=Commands(), policy=MatchInherit()): self.types={} self.searchlets=[] self.policy=policy self.com=commands self.last_answer=[] def add_searchlet(self, slet): self.searchlets.append(slet) if self.types.has_key(slet.intype): self.types[slet.intype].append(slet) else: self.types[slet.intype]=[slet] def _answer(self, question, atype): bucket=[question] answers=[] while bucket: q=bucket.pop() print "Take",q tmp=[] for s in searchlets: if s.can_answer(q): tmp.extend(s.answer(q)) for a in tmp: print "Found", a, if self.policy.has_answered(atype, a.type): print "ok for anwser", answers.append(a) if a.relevance >= 0: print "ok for another turn", bucket.append(a) print "." answers.sort(lambda x,y: cmp(y.relevance, x.relevance)) return answers def answer(self, rawquestion): q=rawquestion.split() if not q: return [] if q[0] == self.com.ASK: asktype=Type(q[1]) wantedtype=Type(q[2]) try: rel=int(q[3]) del(q[3]) except ValueError: rel=100 quest=Info(asktype, string.join(q[3:]), rel) self.last_answer = self._answer(quest, wantedtype) return self.last_answer elif q[0] == self.com.EXPLORE: n=0 try: n=int(q[1])-1 except: pass try: print self.last_answer[n].dump() except IndexError: pass elif q[0] == self.com.BROWSE: n=0 try: n=int(q[1])-1 except: pass try: self.last_answer[n].browse() except IndexError: pass return [] def complete(self, text, state): if state == 0: self.matches = self.get_matches(text) try: return self.matches[state] except IndexError: return None def get_matches(self, text): # if " " in text.lstrip(): # return [] m=[] for w in self.com.commands: if w.startswith(text): m.append(w) return m if(__name__=="__main__"): engine=DispatchEngine() searchlets=[ WebGetLet("google", Type("<>"), Type(""), source="http://www.google.com/search?q=%.200s", regexp="^

]*)>", format="\\1", atn=-20, relatn=-5) , WebGetLet("netcraft",Type(""), Type(""), source="http://www.netcraft.com/?restriction=site+contains&host=%s&position=limited", regexp='HREF="([^"]*).*COLOR', format="\\1", atn=-30, relatn=0) , # SystemCommandLet("lynx",Type(""), Type(""), # source="lynx -crawl -dump %s", # atn=-100, relatn=-20), SystemCommandLet("whois",Type(""),Type(""), source='whois "%s"', atn=-30, relatn=0), ] for s in searchlets: engine.add_searchlet(s) import atexit histfile = os.path.join(os.environ["HOME"], ".searchbot2_history") try: readline.read_history_file(histfile) except IOError: pass atexit.register(readline.write_history_file, histfile) readline.set_completer(engine.complete) readline.parse_and_bind('tab: complete') f=sys.stdin try: while 1: l=raw_input("SB> ") try: ans=engine.answer(l) except KeyboardInterrupt: ans=[] for i in range(len(ans)): print "%2s: %s" % (i+1,str(ans[i])) except EOFError: pass except KeyboardInterrupt: pass print "\nBye."