""" An advanced method of funneling phonemes. This script will reduce the movement of the mouth and make the lip-synchronization less snappy by removing unecessary phonems and adjusting the curves. Some curves will be marked as owned-by-user, and events will be inserted for tongue-only movments. The events will be ignored unless the appropriate animations exist in your character. For the default mapping you can simply import or mount the SpeechSnippets.animset file that is included with the sample content. Owner: John Briggs Copyright (c) 2002-2011 OC3 Entertainment, Inc. """ import FxHelperLibrary from FxStudio import issueCommand from FxPhonemes import PHONEME_REGISTRY from FxNewPhonemeList import PhonemeList, PhonemeInList, WordList from FxHelperLibrary import group_to_word, set_overall_progress, set_task_name, set_task_progress from FxFixPlateaus import fix_plateaus # TODO debugging only. #import logging #LOG_FILENAME = 'advanced_funnel.log' #logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG, filemode='w') def log_debug(msg): #logging.debug(msg) pass # A dictionary to map a set of phonemes to the simplified phoneme archetype. SIMPLIFIED_MAPPING = { 'SIL' : set(['SIL']), 'B' : set(['P', 'B', 'M']), 'V' : set(['F', 'V', 'PH']), 'T' : set(['TS', 'T', 'D', 'L', 'DH', 'RA', 'FLAP']), 'N' : set(['N', 'NG']), 'K' : set(['K', 'G', 'RU', 'CX', 'X', 'GH']), 'CH' : set(['CH', 'JH', 'SH', 'ZH']), 'TH' : set(['TH', 'DH']), 'S' : set(['S', 'Z']), 'H' : set(['H', 'HH']), 'R' : set(['R']), # might combine with ER 'Y' : set(['Y']), 'W' : set(['W']), 'IY' : set(['IY']),#widest 'E' : set(['E', 'EN']),#min wide 'A' : set(['A', 'AA', 'AAN']),#neutral 'UW' : set(['AO', 'AON', 'O', 'ON', 'UW', 'EU', 'OE', 'OEN', 'UU', 'UH', 'OY', 'OW']),#narrow 'IH' : set(['EH', 'AH', 'IH', 'AX', 'UX', 'AE', 'EY', 'AW', 'AY']),#wider 'ER' : set(['ER', 'AXR', 'EXR']),#shch 'UY' : set(['UY'])}#narrowest ALL_ARCHETYPES = [p for p in SIMPLIFIED_MAPPING.iterkeys()] MIN_DURATION = 0.01 SPLIT_DURATION = 0.05 SHORT_SILENCE = 0.20 EVENT_SHIFT = 0.04 RAMP_IN = 0.25 EPSILON = 0.001 cached_event_list = list() # Actions classes class Action(object): """ The base class for any action applied to a phoneme list. """ def facefx_to_id(self, facefx_coding): return PHONEME_REGISTRY.findPhonemeTypeByFaceFXCoding(facefx_coding).phonemeId def id_to_facefx(self, id): return PHONEME_REGISTRY[id].facefxCoding def description(self): """ Returns a human-readable description of what the action will do. """ return "Unimplemented description() method." def __call__(self, phoneme_list, index): """ Executes the action on the phoneme list. """ raise RuntimeError("Unimplemented __call__ method") class SetID(Action): """ Sets the phoneme id at offset to the specified ID. """ def __init__(self, offset, facefx_coding): self.offset = offset self.new_id = self.facefx_to_id(facefx_coding) def description(self): return "Set at offset %d to %s" % (self.offset, self.id_to_facefx(self.new_id)) def __call__(self, phoneme_list, index): phoneme_list[index + self.offset].set_id(self.new_id, SIMPLIFIED_MAPPING) class DeleteGiveToPreceeding(Action): """ Removes the phoneme at offset, giving its time to the preceeding. """ def __init__(self, offset): self.offset = offset def description(self): return "Delete phoneme offset %d, give time to PRECEEDING" % self.offset def __call__(self, phoneme_list, index): del phoneme_list[index + self.offset] class DeleteGiveToSucceeding(Action): """ Removes the phoneme at offset, giving its time to the successor. """ def __init__(self, offset): self.offset = offset def description(self): return "Delete phoneme offset %d, give time to SUCCEEDING" % self.offset def __call__(self, phoneme_list, index): curr_start = phoneme_list[index + self.offset].start_time del phoneme_list[index + self.offset] phoneme_list[index + self.offset].start_time = curr_start class InsertBefore(Action): """ Insert the phoneme before the offset. """ def __init__(self, offset, facefx_coding, duration): self.offset = offset self.new_id = self.facefx_to_id(facefx_coding) self.duration = duration def description(self): return "Insert %0.2fs %s before offset %d" % (self.duration, self.id_to_facefx(self.new_id), self.offset) def __call__(self, phoneme_list, index): new_start = phoneme_list[index + self.offset].start_time phoneme_list[index + self.offset].start_time += self.duration phoneme_list.insert(index + self.offset, PhonemeInList((self.new_id, new_start))) # set the archetype. phoneme_list[index + self.offset].set_id(self.new_id, SIMPLIFIED_MAPPING) class InsertAfter(Action): """ Inserts the phoneme after the offset. """ def __init__(self, offset, facefx_coding, duration): self.offset = offset self.new_id = self.facefx_to_id(facefx_coding) self.duration = duration def description(self): return "Insert %0.2fs %s after offset %d" % (self.duration, self.id_to_facefx(self.new_id), self.offset) def __call__(self, phoneme_list, index): new_start = phoneme_list[index + self.offset + 1].start_time - self.duration phoneme_list.insert(index + self.offset + 1, PhonemeInList((self.new_id, new_start))) phoneme_list[index + self.offset + 1].set_id(self.new_id, SIMPLIFIED_MAPPING) class Split(Action): """ Splits a given phoneme in two at the midpoint. """ def __init__(self, offset, first_facefx_coding, second_facefx_coding): self.offset = offset self.first_id = self.facefx_to_id(first_facefx_coding) self.second_id = self.facefx_to_id(second_facefx_coding) def description(self): return "Split %d into [%s, %s]" % (self.offset, self.id_to_facefx(self.first_id), self.id_to_facefx(self.second_id)) def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 first_start = phoneme_list[curr].start_time duration = phoneme_list[next].start_time - first_start second_start = first_start + (duration / 2.0) phoneme_list.insert(next, PhonemeInList((self.second_id, second_start))) phoneme_list[curr].set_id(self.first_id, SIMPLIFIED_MAPPING) phoneme_list[next].set_id(self.second_id, SIMPLIFIED_MAPPING) class SetDuration(Action): """ Sets the duration of the given phoneme. """ def __init__(self, offset, duration): self.offset = offset self.duration = duration def description(self): return "Set duration of offset %d to %0.2fs" % (self.offset, self.duration) def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 phoneme_list[next].start_time = phoneme_list[curr].start_time + self.duration class DropEventAtOnset(Action): """ Drops an event with the start time equal to the onset of the phoneme. - anim_Name is the name of the animation to place. """ def __init__(self, offset, anim_name=None): self.offset = offset self.anim_name = anim_name def description(self): return "Add %s event at onset of %d" % (self.anim_name if self.anim_name else 'phoneme', self.offset) def get_duration(self, index, phoneme_list): try: return phoneme_list[index + 1].start_time - phoneme_list[index].start_time except IndexError: return 1.0 def __call__(self, phoneme_list, index): global cached_event_list curr = index + self.offset next = curr + 1 prev = curr - 1 start_time = phoneme_list[curr].start_time duration = phoneme_list[next].start_time - start_time weight = phoneme_list[curr].weight combined_duration = self.get_duration(curr, phoneme_list) blend_out = combined_duration blend_in = combined_duration try: prev_duration = self.get_duration(prev, phoneme_list) if phoneme_list[prev].id == 0: combined_duration += RAMP_IN start_time -= RAMP_IN blend_in = RAMP_IN else: combined_duration += prev_duration start_time = phoneme_list[prev].start_time blend_in = prev_duration except IndexError: pass mag_scale = 1 dur_scale = combined_duration blend_in /= combined_duration blend_out /= combined_duration cached_event_list.append( (start_time, self.anim_name if self.anim_name else phoneme_list[curr].facefx_coding(), dur_scale, mag_scale, blend_in, blend_out)) class RemoveConsonant(Action): """ Removes the consonant at index. """ def __init__(self, offset, deletion_functor=None): self.offset = offset self.deletion_functor = deletion_functor def description(self): return "Removing consonant at %d".format(self.offset) def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 action = DeleteGiveToSucceeding(self.offset) if phoneme_list[next].id == 0: action = DeleteGiveToPreceeding(self.offset) action(phoneme_list, index) class ReplaceWithEvent(Action): """ Removes the consonant at index. """ def __init__(self, offset, deletion_functor=None): self.offset = offset self.deletion_functor = deletion_functor def description(self): return "Replacing consonant with event, removing consonant at %d" % self.offset def __call__(self, phoneme_list, index): curr = index + self.offset eventAction = DropEventAtOnset(self.offset) eventAction(phoneme_list, index) removeAction = RemoveConsonant(self.offset) removeAction(phoneme_list, index) class Span(Action): """ Removes the indicated phoneme, splitting its duration between its predecessor and successor. """ def __init__(self, offset): self.offset = offset def description(self): return "Removing %d, spanning with predecessor and successor." % self.offset def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 first_start = phoneme_list[curr].start_time duration = phoneme_list[next].start_time - first_start second_start = first_start + (duration * 0.5) del phoneme_list[curr] phoneme_list[curr].start_time = second_start class Test(object): """ Determine if a given rule is applicable based on a condition. """ def __call__(self, phoneme_list, index): raise RuntimeError("Unimplemented __call__") class IsShort(Test): """ Restrict to short versions of the specified phoneme. """ def __init__(self, offset): self.offset = offset def __call__(self, phoneme_list, index): return phoneme_list[index + self.offset].weight < 0.5 class IsNotLong(Test): """ Restricts to short or normal versions of the specified phoneme. """ def __init__(self, offset): self.offset = offset def __call__(self, phoneme_list, index): return phoneme_list[index + self.offset].weight < 0.7 class IsNotFirst(Test): """ Restricts to phonemes that are not the first in the list. """ def __init__(self, offset): self.offset = offset def __call__(self, phoneme_list, index): return index + self.offset != 0 class DurationGreaterThan(Test): """ Restrict to greater than the specified duration. """ def __init__(self, offset, min_duration): self.offset = offset self.min_duration = min_duration def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 duration = phoneme_list[next].start_time - phoneme_list[curr].start_time return duration >= self.min_duration class DurationLessThan(Test): """ Restrict to less than the specified duration. """ def __init__(self, offset, min_duration): self.offset = offset self.min_duration = min_duration def __call__(self, phoneme_list, index): curr = index + self.offset next = curr + 1 duration = phoneme_list[next].start_time - phoneme_list[curr].start_time return duration <= self.min_duration # Helper classes class ArchetypeSet(object): """ A set of archetypes that can be inversed. """ def __init__(self, archetype_list): """ Initializes the set with the list of phonemes. - phoneme_list is a python list, not a PhonemeList type. """ if isinstance(archetype_list, list): self.archetype_set = set(archetype_list) else: self.archetype_set = set([archetype_list]) def __str__(self): """ Returns a human-readable string format. """ return ' '.join(self.archetype_set) def __repr__(self): """ Returns the python representation. """ return 'ArchetypeSet(["%s"])' % ','.join(self.archetype_set) def __contains__(self, phoneme): """ Returns true if the phoneme set contains the given phoneme. """ if isinstance(phoneme, PhonemeInList): return phoneme.archetype in self.archetype_set else: return phoneme in self.archetype_set def inverse(self): """ Returns the set of phonemes not in this set. """ return ArchetypeSet([a for a in ALL_ARCHETYPES if a not in self]) class PhonemeSet(object): """ A set of phonemes. """ def __init__(self, phoneme_list): """ Initializes the set with the list of phonemes. - phoneme_list is a python list, not a PhonemeList type. """ if isinstance(phoneme_list, list): self.phoneme_set = set(phoneme_list) else: self.phoneme_set = set([phoneme_list]) def __str__(self): return ' '.join(self.phoneme_set) def __repr__(self): return 'PhonemeSet(["%s"])' % ','.join(self.phoneme_set) def __contains__(self, phoneme): return phoneme.facefx_coding() in self.phoneme_set class Rule(object): """ A rule to be applied to the phoneme list. """ def __init__(self, set_list, action_list, test_list = None): """ Create the rule. - set_list is the ordered list of phoneme sets that must match for the rule to be applied. - action_list is the ordered list of actions that will be taken when the rule is applied. - test_list is the ordered list of tests that must pass for the actions to be taken. """ if isinstance(set_list, list): self.set_list = set_list else: self.set_list = [set_list] for i in range(len(self.set_list)): if not (isinstance(self.set_list[i], ArchetypeSet) or isinstance(self.set_list[i], PhonemeSet)): self.set_list[i] = PhonemeSet(self.set_list[i]) if isinstance(action_list, list): self.action_list = action_list else: self.action_list = [action_list] if isinstance(test_list, list): self.test_list = test_list else: self.test_list = [test_list] if test_list is not None else None def __call__(self, phoneme_list, index, restrict_to_events=False): """ Applies the rule to the phoneme list at index if the list matches the set list. """ num_to_match = len(self.set_list) apply_rule = True for i in range(num_to_match): try: apply_rule &= phoneme_list[index + i] in self.set_list[i] except IndexError: apply_rule = False if apply_rule and self.test_list is not None: for test in self.test_list: try: apply_rule &= test(phoneme_list, index) except IndexError: apply_rule = False if apply_rule: log_debug(' Applying rule at offset %d' % index) log_debug(' To phonemes: %s' % str(phoneme_list[index:index+num_to_match])) for action in self.action_list: #if (restrict_to_events and isinstance(action, DropEventAtOnset)) or (not restrict_to_events and not isinstance(action, DropEventAtOnset)): log_debug(' Action: %s' % action.description()) action(phoneme_list, index) log_debug(' Result: %s' % str(phoneme_list[index:index+num_to_match])) return apply_rule SIL = ['SIL'] VOWELS = ['IY', 'E', 'A', 'UW', 'IH', 'ER', 'UY'] STATIC_CONSONANTS = ['B', 'V'] LIQUID_CONSONANTS = ['R', 'Y', 'W'] TONGUE_CONSONANTS = ['T', 'N', 'H', 'K'] EXTRUDED_CONSONANTS = ['CH'] FRICATIVE_CONSONANTS = ['S', 'TH'] NARROW_VOWELS = ['UW', 'UY'] WIDE_VOWELS = ['IY', 'E', 'IH'] EXTRUDED_VOWELS = ['ER'] NEUTRAL_VOWELS = ['A'] SIL = ArchetypeSet(SIL) NOT_SIL = SIL.inverse() TONGUE = ArchetypeSet(TONGUE_CONSONANTS) NOT_A_SET = ArchetypeSet('A').inverse() FRICATIVE = ArchetypeSet(FRICATIVE_CONSONANTS) VOWEL = ArchetypeSet(VOWELS) STATIC_CONSONANT = ArchetypeSet(STATIC_CONSONANTS) NARROW_VOWEL = ArchetypeSet(NARROW_VOWELS) WIDE_VOWEL = ArchetypeSet(WIDE_VOWELS) EXTRUDED_VOWEL = ArchetypeSet(EXTRUDED_VOWELS) NEUTRAL_VOWEL = ArchetypeSet(NEUTRAL_VOWELS) REPLACEMENT_RULES = [ Rule('EY', Split(0, 'EH', 'IY'), DurationGreaterThan(0, SPLIT_DURATION)), Rule('AW', Split(0, 'AE', 'UH'), DurationGreaterThan(0, SPLIT_DURATION)), Rule('AY', Split(0, 'AE', 'IY'), DurationGreaterThan(0, SPLIT_DURATION)), Rule('OY', Split(0, 'AO', 'IY'), DurationGreaterThan(0, SPLIT_DURATION)), Rule('OW', Split(0, 'AO', 'UH'), DurationGreaterThan(0, SPLIT_DURATION)), Rule('SIL', Span(0), [IsNotFirst(0), DurationLessThan(0, SHORT_SILENCE)]), Rule('FLAP', SetID(0, 'T')), Rule(ArchetypeSet('H'), DeleteGiveToSucceeding(0)) ] RULE_LIST = [ Rule([NOT_SIL, ArchetypeSet('K'), NOT_SIL], [DropEventAtOnset(1, 'K'), Span(1)]), Rule([SIL, TONGUE, SIL], [DropEventAtOnset(1), SetID(1, 'AA')]), Rule([VOWEL, TONGUE, TONGUE, ArchetypeSet(STATIC_CONSONANTS+FRICATIVE_CONSONANTS)], [DropEventAtOnset(1), DropEventAtOnset(2), RemoveConsonant(1), RemoveConsonant(1)]), Rule([TONGUE, TONGUE, TONGUE], [DropEventAtOnset(0), DropEventAtOnset(1), RemoveConsonant(1), RemoveConsonant(0)]), Rule([NOT_SIL, TONGUE, ArchetypeSet(STATIC_CONSONANTS+FRICATIVE_CONSONANTS+['ER'])], [DropEventAtOnset(1), RemoveConsonant(1)]), Rule([FRICATIVE, TONGUE, SIL], [DropEventAtOnset(1), SetID(1, 'AA'), SetDuration(1, MIN_DURATION)]), Rule([NOT_A_SET, TONGUE, SIL], [DropEventAtOnset(1), SetID(1, 'AA')]), Rule([TONGUE, TONGUE, TONGUE], [DropEventAtOnset(1), SetID(1, 'AA'), DropEventAtOnset(2), DropEventAtOnset(0), RemoveConsonant(2), RemoveConsonant(0)]), Rule([TONGUE, NOT_SIL], [DropEventAtOnset(0), RemoveConsonant(0)]), Rule([ArchetypeSet(['R']), ArchetypeSet(['IY', 'IH'])], [DropEventAtOnset(0), RemoveConsonant(0)], IsNotLong(1)), Rule([ArchetypeSet(['SIL', 'UW']).inverse(), ArchetypeSet(['R', 'W']), ArchetypeSet(['SIL', 'UW']).inverse()], [SetID(1, 'UH')], IsNotLong(1)), Rule([ArchetypeSet(['SIL', 'ER']).inverse(), ArchetypeSet(['Y']), ArchetypeSet(['SIL', 'ER']).inverse()], [SetID(1, 'ER')], IsNotLong(1)), Rule([ArchetypeSet(FRICATIVE_CONSONANTS), ArchetypeSet(['IH'])], [DeleteGiveToSucceeding(0)], [IsShort(0)]), Rule([ArchetypeSet(['Y']), ArchetypeSet(['UW'])], [DropEventAtOnset(0), RemoveConsonant(0)], IsNotLong(1)), Rule([ArchetypeSet(['S', 'IH', 'B', 'V']), SIL], [InsertBefore(1, 'AA', MIN_DURATION)]), Rule([SIL, ArchetypeSet(STATIC_CONSONANTS+FRICATIVE_CONSONANTS)], [InsertAfter(0, 'AA', MIN_DURATION)]) ] def apply_rules(plist): """ Apply each rule in the specified order. Parameters: - plist: The phonene list from Studio """ phoneme_list = plist.phonemes set_overall_progress(.1) set_task_name('Processing replacement phonemes...') # First, process single-phone replacements. for i in range(len(phoneme_list)): log_debug(str(phoneme_list)) for rule in REPLACEMENT_RULES: rule(phoneme_list, i) set_task_progress(float(i) / len(phoneme_list)) set_overall_progress(.3) set_task_name('Filtering phoneme groups...') # Next, apply very specific rules, generally involving certain phonemes. for i in range(len(phoneme_list)): log_debug(str(phoneme_list)) for rule in RULE_LIST: if rule(phoneme_list, i): # if the rule was applied, reset and reapply the rules again. i -= 1 break set_task_progress(float(i) / len(phoneme_list)) set_overall_progress(.5) set_task_name('Merging similar phonemes...') # Reassign archetypes before merging adjacents. templist = PhonemeList(phoneme_list, plist.duration()) templist.assign_archetypes(SIMPLIFIED_MAPPING) phoneme_list = templist.phonemes # One final pass to merge any adjacent phones of the same archetype. for i in range(len(phoneme_list) - 1): set_task_progress(float(i) / len(phoneme_list)) try: if phoneme_list[i].archetype == phoneme_list[i+1].archetype: del phoneme_list[i+1] except IndexError: break set_overall_progress(.7) return PhonemeList(phoneme_list, plist.duration()) def to_studio(plist, wlist): """ Put the phoneme list back in Studio. Parameters: - plist: The phoneme list after applying the rules. """ # word matching code adapted from Doug's original FunnelPhonemes.py issueCommand('batch'); issueCommand('phonList -clear;'); phonemeIndex = 0 wordIndex = 0 wordStartIndex = 0 word = '' if len(wlist): word = unicode(wlist[wordIndex].text) set_task_name('Updating animation...') for i, p in enumerate(plist): issueCommand('phonList -append -phoneme "{0}" -startTime "{1}" -endTime "{2}"'.format(p.facefx_coding(), p.start_time, p.end_time)) if wordIndex < len(wlist): if p.end_time - wlist[wordIndex].end_time > EPSILON: # If we have passed a word boundary without finding an equivalent phoneme boundry, then the phoneme boundary # was eliminated, and we join the words. if wordIndex + 1 < len(wlist): wordIndex += 1 word = unicode(word + ' ' + wlist[wordIndex].text) elif word != u'': group_to_word(wordStartIndex, phonemeIndex, word) if abs(p.start_time - wlist[wordIndex].start_time) < EPSILON: word = unicode(wlist[wordIndex].text) wordStartIndex = phonemeIndex if abs(p.end_time - wlist[wordIndex].end_time) < EPSILON: group_to_word(wordStartIndex, phonemeIndex, word) word = u'' wordIndex += 1 phonemeIndex += 1 set_task_progress(float(i) / len(plist)) set_overall_progress(.9) set_task_name('') set_task_progress(1) issueCommand('execBatch -editedcurves -changedanimation'); def apply_cached_events(event_list): """ Adds the events that were created while applying rules. """ group = 'SpeechSnippets' animGroup = FxStudio.getSelectedAnimGroupName() animation = FxStudio.getSelectedAnimName() issueCommand('batch') set_task_name('Adding events...') for i, e in enumerate(event_list): start_time, anim, dur_scale, mag_scale, blend_in, blend_out = e issueCommand( 'event -group "%s" -anim "%s" -add -eventgroup "%s" -eventanim "%s" -start "%0.3f" -duration "%0.3f" -magnitude "%0.3f" -blendin "%0.3f" -blendout "%0.3f"' % (animGroup, animation, group, anim, start_time - EVENT_SHIFT, dur_scale, mag_scale, blend_in, blend_out)) set_task_progress(float(i) / len(event_list)) set_overall_progress(.8) issueCommand('execBatch -changedanimation') def reduce_selected_animation(): """ Reduces the phonemes in the currently selected animation. """ print 'Reducing phonemes...' animpath = FxHelperLibrary.get_selected_animpath() if not FxHelperLibrary.anim_exists(animpath): FxStudio.errorBox('There is no animation loaded.') return try: beginProgressDisplay('Reducing phonemes...', 2) plist = PhonemeList.load_from_anim(animpath) wlist = WordList.load_from_anim(animpath) plist.assign_archetypes(SIMPLIFIED_MAPPING) plist.score() start_count = len(plist) global cached_event_list cached_event_list = list() new_list = apply_rules(plist) apply_cached_events(cached_event_list) to_studio(new_list, wlist) end_count = len(new_list) fix_plateaus(animpath) print 'Finished: reduced %d phonemes to %d.' % (start_count, end_count) finally: endProgressDisplay() if getSelectedAnimation()[1] == "": errorBox("No animation selected! This script requires a selected animation to run.") raise RuntimeError, "No animation is selected. Can not run script." if isNoSave(): errorBox("This script can not be run from the no-save version of FaceFx Studio. It relies on being able to get the phoneme word list from python and this feature is disabled.") raise RuntimeError, "No-save version can not run FxAdvancedFunnel script." # Script execution. if __name__ == "__main__": reduce_selected_animation()