# -*- coding: utf-8 -*- __uas_identify__ = "application" __uas_version__ = "1.0b3" # This sample demonstrates the use of automatic speech recognition. # The caller says one of several options, and the application will # then collect - and read - the corresponding news headlines. from prosody.uas import Hangup, Error import sys import threading import copy # this application requires the feedparser module. # please ensure that this has been installed on the # machine that will run the UAS Launcher (and, thus, this # application). # http://sourceforge.net/projects/feedparser/ try: import feedparser except: raise Exception("This application requires the feedparser module. please ensure that this has been installed on the machine that will run the UAS Launcher and, thus, this application. http://sourceforge.net/projects/feedparser/") import time from cgi import escape # Please note that the RSS feed links can change. class Categories: ENTERTAINMENT = [('entertainment','http://feeds.bbci.co.uk/news/entertainment_and_arts/rss.xml')] SPORT = [('football' ,'http://feeds.bbci.co.uk/sport/0/football/rss.xml?edition=uk'), ('rugby_union' ,'http://feeds.bbci.co.uk/sport/0/rugby-union/rss.xml?edition=uk'), ('cricket' ,'http://feeds.bbci.co.uk/sport/0/cricket/rss.xml?edition=uk'), ('sport_front' ,'http://feeds.bbci.co.uk/sport/0/rss.xml?edition=uk')] NEWS = [('news_uk' ,'http://feeds.bbci.co.uk/news/rss.xml?edition=uk'), ('news_us' ,'http://feeds.bbci.co.uk/news/rss.xml?edition=us'), ('news_eu' ,'http://feeds.bbci.co.uk/news/world/europe/rss.xml'), ('news_front' ,'http://feeds.bbci.co.uk/news/rss.xml?edition=int')] class BBC_latest(threading.Thread): def __init__(self): threading.Thread.__init__(self) self._running = False self._stories = {} def get_stories(self): return copy.deepcopy(self._stories) def run(self): self._running = True refresh = 0 try: while self._running is True: if time.time() > refresh: for category, edition in Categories.ENTERTAINMENT: self._stories[category] = self._get_stories(edition) for category, edition in Categories.SPORT: self._stories[category] = self._get_stories(edition) for category, edition in Categories.NEWS: self._stories[category] = self._get_stories(edition) refresh = time.time() + 60 time.sleep(2) except Exception as exc: print('Get BBC story raised exception: {0}'.format(exc)) def quit(self): print("Quit BBC") self._running = False def _get_stories(self, edition): try: news = feedparser.parse(edition) except Exception as exc: print("FP: {0}".format(news)) return None # we will read only today's stories day_today = time.gmtime().tm_yday min_day = day_today - 7 try: title = news.feed.subtitle.split(',', 1)[0] except Exception as exc: print("EX1: {0}".format(exc)) title = 'BBC News' stories = [] try: print("Number of entries for {0} : {1}".format(title, len(news.entries))) # get one week's worth of stories while day_today > min_day: for ent in news.entries: try: enttime = ent["published_parsed"] if enttime.tm_yday == day_today: stories.append(ent["summary"]) except Exception as exc: print("EX2: {0} : {1}".format(ent, exc)) day_today -= 1 except Exception as exc: print("News: {0}".format(exc)) print("{0} : {1} : {2} stories".format(edition, title, len(stories))) return (title, stories) class Connection(): def __init__(self, stories): self._stories = stories self._currently_reading = None self._voices = ['Amy', 'Brian'] def get_story(self): if self._currently_reading is None: return None, None story = '' while story == '': try: story = self._stories[self._currently_reading][1].pop(0) except: return None, None # for alternating between voices if self._voices == []: self._voices = ['Amy', 'Brian'] return self._voices.pop(), story def get_title(self, which): try: print("Read: {0}".format(self._stories[which][0])) self._currently_reading = which num_stories = len(self._stories[which][1]) title = self._stories[which][0] + " There are {0} stories. Say quit to exit or back to interrupt.".format(num_stories or 'no') except: title = 'Sorry, there was an error getting your news. Perhaps the R.S.S. link has changed.' return title def say_tts(channel, message, bargein=False): if bargein: channel.FilePlayer.say_start(message) else: channel.FilePlayer.say(message) def tts_str(string, voice='Amy'): # run escape on the string in case it contains XML reserved characters, e.g. < > & string = escape(string) # Here we demonstrate setting a TTS engine and voice. The TTS engines and voices available on # your account may be different to those shown below. return u"{1}".format(voice, string) def run_asr(channel, grammar, prompt): if channel.SpeechDetector.prime(grammar, channel.SpeechDetector.SpeechDetectorTrigger.ONPLAYSTART) is False: raise Error("Speech detector failed to start") channel.DTMFDetector.clear_digits() say_tts(channel, prompt, bargein=True) # Now get the recognised speech, can be words or DTMF digits response = channel.SpeechDetector.get_recognised_speech() if channel.SpeechDetector.cause() == channel.SpeechDetector.Cause.BARGEIN: choice = channel.DTMFDetector.get_digits(count=1) else: choice = response.get_recognised_words_as_string() return choice def get_choice(channel): which = None my_grammar = channel.SpeechDetector.Grammar("entertainment | sport | news | -garbage-;") prompt = tts_str("Choose between entertainment, sport or news.") choice = run_asr(channel, my_grammar, prompt) if choice == 'news' or choice == '1': prompt = tts_str("Choose between united kingdom, united states, europe or world.") my_grammar = channel.SpeechDetector.Grammar("united kingdom | united states | europe | world | -garbage- -garbage-;") choice = run_asr(channel, my_grammar, prompt) digit = {"united kingdom":0, '1':0, "united states":1, '2':1, "europe":2, "3":2, "world":3, "4":3}.get(choice) if digit: which = Categories.NEWS[digit][0] elif choice == 'sport' or choice == '2': prompt = tts_str("Choose between football, rugby union, cricket or top stories.") my_grammar = channel.SpeechDetector.Grammar("football | rugby union | cricket | top stories | -garbage- -garbage-;") choice = run_asr(channel, my_grammar, prompt) digit = {"football":0, '1':0, "rugby union":1, '2':1, "cricket":2, "3":2, "top stories":3, "4":3}.get(choice) if digit: which = Categories.SPORT[digit][0] elif choice == 'entertainment' or choice == '3': which = Categories.ENTERTAINMENT[0][0] return which def read_stories(channel, gbbc): which = None while which is None: which = get_choice(channel) if not which: say_tts(channel, tts_str("Sorry, I did not understand that.")) my_stories = Connection(gbbc.get_stories()) title = my_stories.get_title(which) say_tts(channel, tts_str(u"{0}".format(title))) voice, story = my_stories.get_story() while story: if channel.SpeechDetector.start(channel.SpeechDetector.Grammar("quit | back;")) is False: raise Error("Speech detector failed to start") say_tts(channel, tts_str(u"{0}".format(story), voice)) response = channel.SpeechDetector.get_recognised_speech(seconds_timeout=1) word = response.get_recognised_words_as_string() channel.SpeechDetector.stop() if word == 'quit': return False if word == 'back': return True voice, story = my_stories.get_story() def main(channel, application_instance_id, file_man, my_log, application_parameters): my_log.info("Get BBC news started") try: which = None gbbc = BBC_latest() gbbc.start() channel.ring(3) channel.answer() say_tts(channel, tts_str("Hello, this is the BBC top stories reader.")) while True: if read_stories(channel, gbbc) is False: break say_tts(channel, tts_str("good bye")) channel.hang_up() my_log.info("Get news completed") except Hangup as exc: my_log.info("Get news completed with Hangup") return 0 except Error as exc: my_log.exception("Get news completed with Error exception! {0}".format(exc)) return -101 except Exception as exc: my_log.exception("Get news completed with exception! {0}".format(exc)) return -102 finally: gbbc.quit() if channel.state() != channel.State.IDLE: channel.hang_up() return 0