Spark

GroupState (mapGroupsWithState/f

2019-10-28  本文已影响0人  海边的贝壳林

Wrapper class for interacting with per-group state data in mapGroupsWithState and flatMapGroupsWithState operations on KeyValueGroupedDataset.

Detail description on [map/flatMap]GroupsWithState operation.

Both, mapGroupsWithState and flatMapGroupsWithState in KeyValueGroupedDataset will invoke the user-given function on each group (defined by the grouping function in Dataset.groupByKey()) while maintaining user-defined per-group state between invocations. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger.
That is, in every batch of the StreamingQuery, the function will be invoked once for each group that has data in the trigger. Furthermore, if timeout is set, then the function will invoked on timed out groups (more detail below).

The function is invoked with following parameters.

In case of a batch Dataset, there is only one invocation and state object will be empty as there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState
is equivalent to [map/flatMap]Groups and any updates to the state and/or timeouts have no effect.

The major difference between mapGroupsWithState and flatMapGroupsWithState is that the former allows the function to return one and only one record, whereas the latter allows the function to return any number of records (including no records). Furthermore, the flatMapGroupsWithState is associated with an operation output mode, which can be either Append or Update. Semantically, this defines whether the output records of one trigger is effectively replacing the previously output records (from previous triggers) or is appending to the list of previously output records. Essentially, this defines how the Result Table (refer to the semantics in the programming guide) is updated, and allows us to reason about the
semantics of later operations.

Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).

Important points to note about using GroupState.

Important points to note about using GroupStateTimeout.

 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

   if (state.hasTimedOut) {                // If called when timing out, remove the state
     state.remove()

   } else if (state.exists) {              // If state exists, use it for processing
     val existingState = state.get         // Get the existing state
     val shouldRemove = ...                // Decide whether to remove the state
     if (shouldRemove) {
       state.remove()                      // Remove the state

     } else {
       val newState = ...
       state.update(newState)              // Set the new state
       state.setTimeoutDuration("1 hour")  // Set the timeout
     }

   } else {
     val initialState = ...
     state.update(initialState)            // Set the initial state
     state.setTimeoutDuration("1 hour")    // Set the timeout
   }
   ...
   // return something
 }

 dataset
   .groupByKey(...)
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

Java example of using GroupState:

 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
    new MapGroupsWithStateFunction<String, Integer, Integer, String>() {

      @Override
      public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
        if (state.hasTimedOut()) {            // If called when timing out, remove the state
          state.remove();

        } else if (state.exists()) {            // If state exists, use it for processing
          int existingState = state.get();      // Get the existing state
          boolean shouldRemove = ...;           // Decide whether to remove the state
          if (shouldRemove) {
            state.remove();                     // Remove the state

          } else {
            int newState = ...;
            state.update(newState);             // Set the new state
            state.setTimeoutDuration("1 hour"); // Set the timeout
          }

        } else {
          int initialState = ...;               // Set the initial state
          state.update(initialState);
          state.setTimeoutDuration("1 hour");   // Set the timeout
        }
        ...
         // return something
      }
    };

 dataset
     .groupByKey(...)
     .mapGroupsWithState(
         mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);

原文:
org.apache.spark.sql.streaming.GroupState#scala-doc

上一篇下一篇

猜你喜欢

热点阅读