blob: eb0910d6fadec8a45b87bfe0eaa83868a32cf741 [file] [log] [blame]
import test_appliance, sys, StringIO
from yaml import *
import yaml
class TestEmitter(test_appliance.TestAppliance):
def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
self._testEmitter(test_name, data_filename)
def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, False)
def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, True)
def _testEmitter(self, test_name, filename, canonical=None):
events = list(iter(Parser(Scanner(Reader(file(filename, 'rb'))))))
if canonical is not None:
events[0].canonical = canonical
#self._dump(filename, events)
writer = StringIO.StringIO()
emitter = Emitter(writer)
for event in events:
emitter.emit(event)
data = writer.getvalue()
new_events = list(parse(data))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
#self.failUnlessEqual(event.implicit, new_event.implicit)
if not event.implicit and not new_event.implicit:
self.failUnlessEqual(event.tag, new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
def _dump(self, filename, events):
writer = sys.stdout
emitter = Emitter(writer)
print "="*30
print "ORIGINAL DOCUMENT:"
print file(filename, 'rb').read()
print '-'*30
print "EMITTED DOCUMENT:"
for event in events:
emitter.emit(event)
TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
#TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
#TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
class EventsConstructor(Constructor):
def construct_event(self, node):
if isinstance(node, ScalarNode):
mapping = {}
else:
mapping = self.construct_mapping(node)
class_name = str(node.tag[1:])+'Event'
if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
mapping.setdefault('anchor', None)
if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
mapping.setdefault('tag', None)
if class_name == 'ScalarEvent':
mapping.setdefault('value', '')
value = getattr(yaml, class_name)(**mapping)
return value
EventsConstructor.add_constructor(None, EventsConstructor.construct_event)
class TestEmitterEvents(test_appliance.TestAppliance):
def _testEmitterEvents(self, test_name, events_filename):
events = list(load(file(events_filename, 'rb'), Constructor=EventsConstructor))
#self._dump(events_filename, events)
writer = StringIO.StringIO()
emitter = Emitter(writer)
for event in events:
emitter.emit(event)
data = writer.getvalue()
new_events = list(parse(data))
self.failUnlessEqual(len(events), len(new_events))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
self.failUnless(event.implicit == new_event.implicit
or event.tag == new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
def _dump(self, events_filename, events):
writer = sys.stdout
emitter = Emitter(writer)
print "="*30
print "EVENTS:"
print file(events_filename, 'rb').read()
print '-'*30
print "OUTPUT:"
for event in events:
emitter.emit(event)
TestEmitterEvents.add_tests('testEmitterEvents', '.events')