Tutorial 3: Data flow

Begin, action, end

A PFA scoring engine processes a linear stream of data that has a beginning and possibly an end. Each datum in the stream has the same type and comes from the same source, so if you want to combine data of different types from different sources, create two or more scoring engines and connect them in a pipeline system.

In some cases, you may want to perform special actions at the beginning and end of a data stream. PFA has begin and end routines for this purpose (similar to awk).

The begin and end routines do not accept input and do not return output; they only manipulate persistent storage.

Persistent storage

A PFA scoring engine has four types of persistent storage: cells and pools, both of which may be private or shared. These storage areas are like local symbols in that they store Avro-typed data, but they are unlike local symbols in that they have global scope and are remembered between action invocations, and between the begin and end. The syntax for accessing them is also different from local symbol references, to make it easier for a PFA host to statically analyze how they are used.

Cells store individual, named values of a specific type. The scoring engine below reproduces the fold-method example by storing the tally in a cell of type string. It is somewhat more cumbersome to use a persistent cell rather than the fold method, but a few interacting cells can perform more complex tasks than the fold method alone.

Cells cannot be created or destroyed at runtime, and they must be initialized before the begin method. (In the case above, the initial value is an empty string.) Pools are persistent storage elements without this restriction. They can be used to gather data into tables.

The engine above creates a new entry in the wordCount table with value 0 when it encounters a new word and it increments the count when it encounters a previously seen word, then it outputs the number of occurrences of "hello". (There are library functions to manage count tables; it was done manually here for illustration.)

A pool of type X is like a cell of type {"type": "map", "values": X} except for how they are updated. The value of a cell is entirely replaced in one atomic action, but only one item in a pool is replaced atomically, not the whole pool. For private data (accessed by a single scoring engine), this difference is only seen in the runtime speed of very large pools. Updating one pool-item at a time is faster than updating an entire cell with a single map value changed. But it is especially relevant for shared data, since the granularity of atomic updates changes the behavior of the system.

Concurrent access of shared data

Multiple scoring engines can share the same cells and pools. This may be because the PFA host generates many instances of a scoring engine from a PFA document (mappers in Hadoop, for instance) or because different scoring engines are coordinated through a database. The mechanism and reasons for sharing data are beyond the scope of the PFA specification, but the synchronization policy is not, since it affects how data are calculated.

PFA uses a read-copy-update policy to maximize availability of data for reading. It can be implemented in any environment that supports locks. In this policy, a write operation begins by locking the memory element in such a way that it can still be read, but only one writer has access to write. The writer then copies the value and operates on the copied version. When finished, the writer atomically swaps a pointer from the old version to the new version, and after that instant, readers see the new value. It then releases the lock so that other writers can begin.

To avoid the possibilitiy of deadlock, a PFA host should additionally verify that each write operation only writes to one cell or pool item. This would be a rather draconian policy for general-purpose programming, but numerical calculations rarely need more structural complexity than this.

Although the Google App Engine PFA host does not actually implement sharing, it performs the deadlock check described above. The PFA document below cannot be executed because of its violation of this policy. It uses changeOne to change a cell, but this function has side-effects (changes another cell or pool) indirectly through functionThatCallsChangeTwo.

Immutable data

All values in PFA (numbers, strings, arrays, maps, record structures, etc.) are immutable in the sense that PFA has no library functions to change them in-place. Whenever you want to change one item of an array, one key-value pair in a map, or one field in a record, the entire array, map, or record must be replaced by a new one with the single item replaced.

For single-item replacements, this is slower than in-place updates because unaffected values must be copied. But it is not as slow as you might think, since the assurance that a value will never be modified in place allows an implementation to share the value among many objects. Siblings of the changed item only need to be shallow-copied, not deeply copied (structural sharing). When one needs a copy of a whole data structure, it can simply be referenced (no copying at all).

For example, a general tree-like data structure with N nodes can be updated and “copied” with the following time complexity.

  update one element obtain an independent copy
mutable data O(1) O(N)
immutable data O(log(N)) O(1)

Mutable data is faster if single-element updates vastly outnumber whole structure copies; immutable data is faster in the opposite extreme. PFA uses immutable data for several reasons.

Model parameters

One common application of PFA is to represent statistical models. Common model types, which are implemented by PFA library functions, can be expressed as structured sets of parameters for the library functions to interpret. These parameter sets are often stored in persistent cells to ensure that they aren’t re-created in every action and to allow the model to change. (Some workflows update the model as they collect data; others do not.)

Here is an example of a small (4-leaf) decision tree. The Datum record defines the format of incoming data to be scored by the tree and TreeNode defines the structure of the tree itself. The tree in this example is initialized and never updated.

This PFA document has a one-line action that simply applies the tree to the data. Although model.tree.simpleWalk performs type-checks, its signature is generic, only requiring the record fields that are necessary to walk through a tree and placing minimal constraints on their types.

For instance, this tree is a decision tree because its pass and fail types are [string, TreeNode] (union of string and TreeNode), meaning that pass and fail may lead to strings (categorical scores) or additional TreeNodes. If string were replaced with double, this would be a regression tree, and model.tree.simpleWalk would be just as capable of evaluating it. If string were replaced with {"type": "array", "items": "double"}, it would be a multivariate regression tree. Still other possibilities are left open because the model.tree.simpleWalk signature is generic.

Alternate output streams: exceptions and logs

Although PFA is designed to minimize the number of things that can go wrong at runtime, some runtime errors are still possible. For instance, accessing the thirteenth element of a twelve-element array is a runtime error, since type-checkers and static analyzers cannot determine the length of an array from the PFA document alone. This kind of condition causes a non-local exit from the scoring procedure with an error message— in other words, an exception.

All runtime exceptions that the library functions can throw are documented in the reference. However, it is also possible for the PFA author to define exceptions as well. User-defined exceptions are typically used to skip action events that present an invalid state (more specific than what the type-check eliminates and not an ordinary case of “missing data”, which should be handled by a type-safe null). The PFA language has no try-catch equivalent.

If some of the scoring engine’s cells and pools would be invalidated by being partially updated, they can request rollback. At the beginning of each action, cells and pools with rollback set to true are copied; if a library or user exception occurs during the action, their values are replaced by the initial value. This example demonstrates exceptions and rollback:

The counter outputs 1, 2 if rollback is true, and 4, 5 if rollback is false or not present.

Since exceptions provide the host with information in a different way than ordinary output, exceptions may be considered an alternative form of output. Another auxiliary output is the log output.

Logging in PFA is usually for debugging. The {"log": ["x", "y", "z"]} form writes the values of x, y, and z in JSON form on the log output. The PFA host has complete control over the way logs are handled, if at all. Note that x, y, and z are expressions, not strings. To write a plain string to the output file, enclose it in {"string": } or [ .. ] to make a literal string expression.