Home > Object Persistency API > API Reference > Streaming
OOMEGA supports data streams. The streaming API is e.g. used for serialisation purposes, cp. chapter SDF and SDML. Its implementation is part of the Core project and filed in the package org.oomega.streaming.
Sequences
Streams are subdivided into one or more sequences. The nature of sequences is that all pointers contained within transferred entities can be dereferenced at the end of a sequence. Let's discuss the two cases:
- Bidirectional associations: if a transferred entity object is involved in a bidirectional association, the referenced entity object must be part of the same sequence. Otherwise the end of the sequence could not be reached, because of a dangling reference.
- Unidirectional associations: if a transferred entity object is involved in an unidirectional association, the referenced entity need not to be in the same sequence. If this is the case, the referenced entity must have been transmitted already in a previous sequence.
One prominent use case for sequences is storage and transmission of metamodel and model information together. The metamodel can be transmitted in the first sequence. The second sequence can be dynamically interpreted due to the already received metamodel information.
Interfaces and Classes
In general data sources and data sinks are distinguished. The main interfaces you have to deal with are called EOInputStream and EOOutputStream.
EOInputStream / EOOutputStream
package org.oomega.streaming; public interface EOInputStream<EO extends EntityObject> { public boolean isBeginOfSequence(); public boolean isEndOfSequence(); public boolean hasNext(); public EO getNext(); }
- public boolean isBeginOfSequence(): returns true, if the stream is at the beginning of a sequence. The next read entity object is the first one of the new sequence. At the beginning of a stream there's always a sequence started.
- public boolean isEndOfSequence(): returns true, if the stream is at the end of a sequence. The last read entity object was the last one of the sequence. At the end of a stream there's always the end of the current sequence reached.
- public boolean hasNext(): returns true, if there's another entity object available in the stream. The end of the stream hasn't been reached yet.
- pubic EO getNext(): returns the next entity object of the stream or null if the end of the stream has been reached already.
package org.oomega.streaming; public interface EOOutputStream<EO extends EntityObject> { public void putNext(EO eo); public void putNext(Collection<? extends EO> eos); public void startNextSequence(); public void close(); }
- public void putNext(EO eo): writes an entity object to the stream. If the stream has been closed already, this call yields to an exception.
- public void putNext(Collection<? extends EO> eos): writes a collection of entity objects to the stream. The order is defined by the collection. If the stream has been closed already, this call yields to an exception.
- public void startNextSequence(): closes the current and opens a new sequence. Empty sequences are not written to the stream. Successive calls of this method have no effect. If the stream has been closed already, this call yields to an exception.
- public void close(): closes the stream. If the stream has been closed already, this call yields to an exception.
StreamingContext
The EOContainer interface extends the StreamingContext interface. Hence every entity object container (e.g. MemoryEOC, VersantEOC, ...) can serve as a data source or a data sink, i.e. you can stream from and to an entity object container.
package org.oomega.streaming; public interface StreamingContext { public EOInputStream<EntityObject> getEOInputStream(); public EOOutputStream<EntityObject> getEOOutputStream(); }
ActivePipe
An ActivePipe is initialised with an EOOutputStream and an EOInputStream. These streams are tied together with that pipe. As soon as the method run is called, the content is streamed from eoIn to eoOut.
package org.oomega.streaming; public class ActivePipe<EO extends EntityObject> implements Runnable { // constructor public ActivePipe(EOOutputStream<EO> eoOut, EOInputStream<? extends EO> eoIn) {} // runnable implementation public void run() {} }
EOInputCollection / EOOutputCollection
EOInputCollection and EOOutputCollection are concrete implementations of EOInputStream and EOOutputStream respectively.
package org.oomega.streaming; public class EOInputCollection<EO extends EntityObject> implements EOInputStream<EO> { // constructor public EOInputCollection(Collection<EO>... sequenceCollections) {} // EOInputStream implementation }
EOInputCollection can be initialised with one or more collections, i.e. sequences of entity objects.
package org.oomega.streaming; public class EOOutputCollection<EO extends EntityObject> implements EOOutputStream<EO> { // constructor public EOOutputCollection() {} // additional method public List<List<EO>> getCollections() {} // EOOutputStream implementation }
EOOutputCollection can be asked for the list of sequences that have been already written to the stream.
EOCInputStream / EOCOutputStream
EOCInputStream and EOCOutputStream are concrete implementations of EOInputStream and EOOutputStream respectively. They can be initialised with an entity object container session. The container serves as a data source or data sink.
package org.oomega.streaming; public class EOCInputStream implements EOInputStream<EntityObject> { // constructor public EOCInputStream(EOContainerSession eocSession) {} // EOInputStream implementation }
package org.oomega.streaming; public class EOCOutputStream implements EOOutputStream<EntityObject> { // constructor public EOCOutputStream(EOContainerSession eocSession) {} // EOOutputStream implementation }
EOISFilter / EOOSFilter
EOISFilter and EOOSFilter are concrete implementations of EOInputStream and EOOutputStream respectively. They can be initialised with an existing intput/output stream. Additionally there's the predicate filterBy provided in order to filter the entity objects read from or written to the stream. Predicates are discussed in chapter Query Language.
package org.oomega.streaming; public class EOISFilter<EO extends EntityObject> extends EOInputStreamDecorator<EO> { // constructor public EOISFilter(EOInputStream<? extends EntityObject> eoIn, Predicate filterBy, EOContainerSession eocSession) {} // EOInputStream implementation }
package org.oomega.streaming; public class EOOSFilter<EO extends EntityObject> extends EOOutputStreamDecorator<EO> { // constructor public EOOSFilter(EOOutputStream<EO> eoOut, Predicate filterBy, EOContainerSession eocSession) {} // EOOutputStream implementation }
EOOSDuplicateFilter
EOOSDuplicateFilter is a concrete implementation of EOOutputStream. It can be initialised with an existing output stream. Every object is written only once to the stream. Duplicates are omitted automatically.
package org.oomega.streaming; public class EOOSDuplicateFilter<EO extends EntityObject> extends EOOutputStreamDecorator<EO> { // constructor public EOOSDuplicateFilter(EOOutputStream<? extends EntityObject> eoOut) {} // EOOutputStream implementation }
EOOSCompositionCompleter
EOOSCompositionCompleter is a concrete implementation of EOOutputStream. It can be initialised with an existing output stream. If an entity object is written to the stream, the implementation automatically complements it with entity objects referenced from the original object via a composition. The following information has to be provided:
- int depth: if depth == 1, directly referenced entity objects are added; if depth == 2, referenced entity objects of directly referenced entity objects are added additionally; and so on.
- EOContainerSession eocSession: the entity object container session to ask for additionally required entity objects.
package org.oomega.streaming; public class EOOSCompositionCompleter extends EOOutputStreamDecorator<EntityObject> { // constructor public EOOSCompositionCompleter(EOOutputStream<EntityObject> eoOut, int depth, EOContainerSession eocSession) {} // EOOutputStream implementation }
Next chapter: Custom Sorter