| |
| import test_appliance, sys, StringIO |
| |
| from yaml import * |
| import yaml |
| |
| class TestEmitter(test_appliance.TestAppliance): |
| |
| def _testEmitterOnData(self, test_name, canonical_filename, data_filename): |
| self._testEmitter(test_name, data_filename) |
| |
| def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename): |
| self._testEmitter(test_name, canonical_filename, False) |
| |
| def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename): |
| self._testEmitter(test_name, canonical_filename, True) |
| |
| def _testEmitter(self, test_name, filename, canonical=None): |
| events = list(iter(Parser(Scanner(Reader(file(filename, 'rb')))))) |
| if canonical is not None: |
| events[0].canonical = canonical |
| #self._dump(filename, events) |
| writer = StringIO.StringIO() |
| emitter = Emitter(writer) |
| for event in events: |
| emitter.emit(event) |
| data = writer.getvalue() |
| new_events = list(parse(data)) |
| for event, new_event in zip(events, new_events): |
| self.failUnlessEqual(event.__class__, new_event.__class__) |
| if isinstance(event, NodeEvent): |
| self.failUnlessEqual(event.anchor, new_event.anchor) |
| if isinstance(event, CollectionStartEvent): |
| self.failUnlessEqual(event.tag, new_event.tag) |
| if isinstance(event, ScalarEvent): |
| #self.failUnlessEqual(event.implicit, new_event.implicit) |
| if not event.implicit and not new_event.implicit: |
| self.failUnlessEqual(event.tag, new_event.tag) |
| self.failUnlessEqual(event.value, new_event.value) |
| |
| def _dump(self, filename, events): |
| writer = sys.stdout |
| emitter = Emitter(writer) |
| print "="*30 |
| print "ORIGINAL DOCUMENT:" |
| print file(filename, 'rb').read() |
| print '-'*30 |
| print "EMITTED DOCUMENT:" |
| for event in events: |
| emitter.emit(event) |
| |
| TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') |
| #TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') |
| #TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') |
| |
| class EventsConstructor(Constructor): |
| |
| def construct_event(self, node): |
| if isinstance(node, ScalarNode): |
| mapping = {} |
| else: |
| mapping = self.construct_mapping(node) |
| class_name = str(node.tag[1:])+'Event' |
| if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
| mapping.setdefault('anchor', None) |
| if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']: |
| mapping.setdefault('tag', None) |
| if class_name == 'ScalarEvent': |
| mapping.setdefault('value', '') |
| value = getattr(yaml, class_name)(**mapping) |
| return value |
| |
| EventsConstructor.add_constructor(None, EventsConstructor.construct_event) |
| |
| class TestEmitterEvents(test_appliance.TestAppliance): |
| |
| def _testEmitterEvents(self, test_name, events_filename): |
| events = list(load(file(events_filename, 'rb'), Constructor=EventsConstructor)) |
| #self._dump(events_filename, events) |
| writer = StringIO.StringIO() |
| emitter = Emitter(writer) |
| for event in events: |
| emitter.emit(event) |
| data = writer.getvalue() |
| new_events = list(parse(data)) |
| self.failUnlessEqual(len(events), len(new_events)) |
| for event, new_event in zip(events, new_events): |
| self.failUnlessEqual(event.__class__, new_event.__class__) |
| if isinstance(event, NodeEvent): |
| self.failUnlessEqual(event.anchor, new_event.anchor) |
| if isinstance(event, CollectionStartEvent): |
| self.failUnlessEqual(event.tag, new_event.tag) |
| if isinstance(event, ScalarEvent): |
| self.failUnless(event.implicit == new_event.implicit |
| or event.tag == new_event.tag) |
| self.failUnlessEqual(event.value, new_event.value) |
| |
| def _dump(self, events_filename, events): |
| writer = sys.stdout |
| emitter = Emitter(writer) |
| print "="*30 |
| print "EVENTS:" |
| print file(events_filename, 'rb').read() |
| print '-'*30 |
| print "OUTPUT:" |
| for event in events: |
| emitter.emit(event) |
| |
| TestEmitterEvents.add_tests('testEmitterEvents', '.events') |
| |