Streaming Expressions

2020-01-12

Streaming Expressions provide a simple yet powerful stream processing language for Solr Cloud.

Streaming expressions are a suite of functions that can be combined to perform many different parallel computing tasks. These functions are the basis for the Parallel SQL Interface.

There is a growing library of functions that can be combined to implement:

  • Request/response stream processing

  • Batch stream processing

  • Fast interactive MapReduce

  • Aggregations (Both pushed down faceted and shuffling MapReduce)

  • Parallel relational algebra (distributed joins, intersections, unions, complements)

  • Publish/subscribe messaging

  • Distributed graph traversal

  • Machine learning and parallel iterative model training

  • Anomaly detection

  • Recommendation systems

  • Retrieve and rank services

  • Text classification and feature extraction

  • Streaming NLP

  • Statistical Programming

Streams from outside systems can be joined with streams originating from Solr and users can add their own stream functions by following Solr’s Java streaming API.

Important

Both streaming expressions and the streaming API are considered experimental, and the APIs are subject to change.

Stream Language Basics

Streaming Expressions are comprised of streaming functions which work with a Solr collection. They emit a stream of tuples (key/value Maps).

Many of the provided streaming functions are designed to work with entire result sets rather than the top N results like normal search. This is supported by the /export handler.

Some streaming functions act as stream sources to originate the stream flow. Other streaming functions act as stream decorators to wrap other stream functions and perform operations on the stream of tuples. Many streams functions can be parallelized across a worker collection. This can be particularly powerful for relational algebra functions.

Streaming Requests and Responses

Solr has a /stream request handler that takes streaming expression requests and returns the tuples as a JSON stream. This request handler is implicitly defined, meaning there is nothing that has to be defined in solrconfig.xml - see Implicit RequestHandlers.

The /stream request handler takes one parameter, expr, which is used to specify the streaming expression. For example, this curl command encodes and POSTs a simple search() expression to the /stream handler:

curl --data-urlencode 'expr=search(enron_emails,
                                   q="from:1800flowers*",
                                   fl="from, to",
                                   sort="from asc",
                                   qt="/export")' http://localhost:8983/solr/enron_emails/stream

Details of the parameters for each function are included below.

For the above example the /stream handler responded with the following JSON response:

{"result-set":{"docs":[
   {"from":"1800flowers.133139412@s2u2.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers.93690065@s2u2.com","to":"jtholt@ect.enron.com"},
   {"from":"1800flowers.96749439@s2u2.com","to":"alewis@enron.com"},
   {"from":"1800flowers@1800flowers.flonetwork.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@1800flowers.flonetwork.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@1800flowers.flonetwork.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@1800flowers.flonetwork.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@1800flowers.flonetwork.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"ebass@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"lcampbel@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"ebass@enron.com"},
   {"from":"1800flowers@shop2u.com","to":"ebass@enron.com"},
   {"EOF":true,"RESPONSE_TIME":33}]}
}

Note the last tuple in the above example stream is {"EOF":true,"RESPONSE_TIME":33}. The EOF indicates the end of the stream. To process the JSON response, you’ll need to use a streaming JSON implementation because streaming expressions are designed to return the entire result set which may have millions of records. In your JSON client you’ll need to iterate each doc (tuple) and check for the EOF tuple to determine the end of stream.

The org.apache.solr.client.solrj.io package provides Java classes that compile streaming expressions into streaming API objects. These classes can be used to execute streaming expressions from inside a Java application. For example:

    StreamFactory streamFactory = new DefaultStreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress());
    InjectionDefense defense = new InjectionDefense("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\"?$?\", sort=\"a_s asc\")");
    defense.addParameter(zkhost);
    ParallelStream pstream = (ParallelStream)streamFactory.constructStream(defense.safeExpressionString());

Note that InjectionDefense need only be used if the string being inserted could contain user supplied data. See the javadoc for InjectionDefense for usage details and SOLR-12891 for an example of the potential risks. Also note that for security reasons normal parameter substitution no longer applies to the expr parameter unless the jvm has been started with -DStreamingExpressionMacros=true (usually via solr.in.sh)

Data Requirements

Because streaming expressions relies on the /export handler, many of the field and field type requirements to use /export are also requirements for /stream, particularly for sort and fl parameters. Please see the section Exporting Result Sets for details.

Request Routing

Streaming Expressions respect the shards.preference parameter for any call to Solr.

The value of shards.preference that is used to route requests is determined in the following order. The first option available is used. - Provided as a parameter in the streaming expression (e.g. search(…​., shards.preference="replica.type:PULL")) - Provided in the URL Params of the streaming expression (e.g. http://solr_url:8983/solr/stream?expr=…​.&shards.preference=replica.type:PULL) - Set as a default in the Cluster properties.

Adding Custom Expressions

Creating your own custom expressions can be easily done by implementing the Expressible interface. To add a custom expression to the list of known mappings for the /stream handler, you just need to declare it as a plugin in solrconfig.xml via:

<expressible name="custom" class="org.example.CustomStreamingExpression"/>

Types of Streaming Expressions

About Stream Sources

Stream sources originate streams. The most commonly used one of these is search, which does a query.

A full reference to all available source expressions is available in Stream Source Reference.

About Stream Decorators

Stream decorators wrap other stream functions or perform operations on a stream.

A full reference to all available decorator expressions is available in Stream Decorator Reference.

About Stream Evaluators

Stream Evaluators can be used to evaluate (calculate) new values based on other values in a tuple. That newly evaluated value can be put into the tuple (as part of a select(…​) clause), used to filter streams (as part of a having(…​) clause), and for other things. Evaluators can contain field names, raw values, or other evaluators, giving you the ability to create complex evaluation logic, including conditional if/then choices.

In cases where you want to use raw values as part of an evaluation you will need to consider the order of how evaluators are parsed.

  1. If the parameter can be parsed into a valid number, then it is considered a number. For example, add(3,4.5)

  2. If the parameter can be parsed into a valid boolean, then it is considered a boolean. For example, eq(true,false)

  3. If the parameter can be parsed into a valid evaluator, then it is considered an evaluator. For example, eq(add(10,4),add(7,7))

  4. The parameter is considered a field name, even if it quoted. For example, eq(fieldA,"fieldB")

If you wish to use a raw string as part of an evaluation, you will want to consider using the raw(string) evaluator. This will always return the raw value, no matter what is entered.

A full reference to all available evaluator expressions is available in Stream Evaluator Reference.