[RFC] Stream Dialect

Introduction

Streaming libraries and abstractions become more popular as languages and tools adopt such concepts. Furthermore, some high-level synthesis tools provide library support for streaming code, which then can be lowered to hardware.
In the spirit of MLIR and CIRCT, we anticipate that providing higher-level
abstraction in the form of a dialect simplifies further implementation efforts by providing a uniform interface.

Over the last few months I worked on a stream dialect that has the goal to be lowered to hardware. @mortbopet told me that there was some interest in the CIRCT community (especially @stephenneuendorffer) add such abstractions for HLS flows.
I’m trying to provide a brief description of my current approach to collect some feedback. If there is indeed interest for such an abstraction, I would gladly work towards upstreaming this dialect.

My current implementation is here: GitHub - Dinistro/circt-stream: A stream to RTL compiler based on MLIR and CIRCT

Types

The stream dialect introduces a single type that defines a stream by its element types. An element can either be an integer or a tuple of element types.

Examples:

!stream.stream<i64>
!stream.stream<tuple<i32, tuple<i8, i64>>>

I did not yet play around with floats, but supporting that will mainly depend on the lower levels.

Operations

There are two different kinds of operations:

  1. A set of operations that work directly with streams. These operations all consume and produce a variable amount of streams.
  2. Auxiliary operations that help to work with elements of the stream, e.g., packing or unpacking tuples, yielding elements, etc.

So far, the stream dialect supports the following set of stream operations: map, filter, reduce, and create.
The first three expect regions that define the computation to be performed on each stream element. Note that the region arguments differ depending on the operation and the element types of the streams passed in.

Example:

  %res = stream.map(%in) : (!stream.stream<i32>) -> !stream.stream<i32> {
  ^0(%val : i32):
    %0 = arith.constant 1 : i32
    %r = arith.addi %0, %val : i32
    stream.yield %r : i32
  }
}

The create produces a stream from a fixed set of values and is thus mainly used for integration testing.

Lowering

One natural target for the streaming abstraction to lower is the handshake dialect.
The handshake dialect is somewhat stable, and the StandardToHandshake pass can be reused to lower the regions of the operations.

The streaming abstraction can be lowered to a task pipelined handshake representation.
Each stream becomes an handshaked value of the element type, and all the operations defined on this stream are applied directly to these values.
Note that certain operations like filter and reduce might not produce an output for each incoming tuple, and thus, they terminate some of the tasks.

End-of-Stream signal

Some operations, e.g., reduce, only produce a result when the incoming stream terminates.
To allow such behavior upon lowering each stream provides an EOS signal which is asserted once
the stream is ends. The lowering packs the element value and the EOS signal into one tuple to ensure only one handshake mechanism is emitted for each stream.

Open Questions

Memory

For some tasks it is necessary to have memory that an operation can use. Modeling this in the stream dialect is not too difficult but lowering it to handshake is non-trivial. This memory will require an initialization phase, e.g., on reset, but handshake has no notion of such a thing.


I’m looking forward for some feedback and questions. The code base contains many more examples for the different operators described here.

2 Likes

Thanks for sharing this @Dinistro, this is great to see. And thanks for the help in getting Handshake to a place that your Stream dialect can target it!

One question I have off the bat: do you have some ideas about what would utilize the Stream dialect?

I can imagine a few situations where an IR like this could be useful, and I’m curious if any of these is being considered.

One thing I thought of was the hls::stream<> style of interface, for example exposed by Vitis.

Another thing I thought of was in the “big data” processing frameworks, for example the DataStream API in Flink.

Finally, I can imagine some functional programming languages that expose a “stream” concept natively, for example the Stream module in Elixir.

These are just a few places I could imagine using a “stream” abstraction in some sort of compiler. But I’m curious what use-cases you had in mind.

1 Like

Thanks for sharing this @Dinistro, this is great to see. And thanks for the help in getting Handshake to a place that your Stream dialect can target it!

You are welcome. It’s great to improve existing things while still saving a lot of work compared to implement everything from scratch.

One question I have off the bat: do you have some ideas about what would utilize the Stream dialect?

We do not yet have a fleshed out plan, but in essence my main goal is to provide HLS that indeed works on a “high-level” by providing first class language support. If we target this by raising from another abstraction, coming from a library like Flink, or using it in a DSL manner for HLS shouldn’t matter too much. During my thesis, we plan to compare to Vitis’ hls:stream<> library.

Another thing I thought of was in the “big data” processing frameworks, for example the DataStream API in Flink

In fact, I was taking some inspiration from the Flink framework, but some concepts are hard to model efficiently in hardware.

Finally, I can imagine some functional programming languages that expose a “stream” concept natively, for example the Stream module in Elixir.

I do not know Elixir, but it looks a bit like Java’s stream API, which is another influence. I’ll look deeper into Elixir as well o see if some other interesting concepts are applied there.

Awesome work, and nice to see an RFC with good support for lowering to an existing dialect.

Some questions:

  • In Handshake, we use max ssa form to ensure that any value referenced within a block is also defined within said block (through either a block argument or an op within the block). This, because all SSA values referenced within a block must have a token whenever control enters that block. I could imagine that this restriction also applies to the inner regions of the stream ops. To avoid making the inner regions of the stream ops IsolatedFromAbove (doesn’t make sense in a software context), a pass could be written where stream ops that capture SSA values from an outer scope transform into a reduce with the captured variable(s) being passed as an accumulator argument. An example below with a reduce op, but i imagine this is also possible with map/filter/… (the captured variables then have to be fed into the input stream of those ops):
func.func @reduce(%in: !stream.stream<i64>, %add : i64) -> !stream.stream<i64> {
    %res = stream.reduce(%in) {initValue = 0 : i64}: (!stream.stream<i64>) -> !stream.stream<i64> {
    ^0(%acc: i64, %val: i64):
        %r = arith.addi %acc, %val : i64
        // %add references an SSA value defined in outer scope
        %r2 = arith.addi &r, %add : i64
        stream.yield %r : i64
    }
    return %res : !stream.stream<i64>
}

// transforms to
func.func @reduce(%in: !stream.stream<i64>, %add : i64) -> !stream.stream<i64> {
   // %add is now captured in the accumulator
    %res = stream.reduce(%in) {initValue = {0, %add} : tuple<i64, i64>}: (!stream.stream<i64>) -> !stream.stream<tuple<i64, i64>> {
    ^0(%acc: tuple<i64, i64>, %val: i64):
        %accV:2 = stream.unpack %acc : tuple<i64, i64>
        %r = arith.addi %accV#0, %val : i64
        %r2 = arith.addi &r, %accV#1 : i64
        // pack the values to forward %add to the next iteration
        %r3 = stream.pack %r2, %accV#1 : i64
        stream.yield %r3 : tuple<i64, i64>
    }

    // Split the stream to discard the %add
    %resAcc = stream.map(%res) : (!stream.stream<tuple<i64, i64>>) -> !stream.stream<i64> {
    ^0(%val : tuple<i64, i64>):
      %val2:2 = stream.unpack %val : tuple<i64, i64>
      stream.yield %val2#0 : i64
    }

    return %resAcc : !stream.stream<i64>
}
  • Would it make sense to have a stream operation that can pop an arbitrary amount of inputs and push an arbitrary amount of outputs, e.g. 3 inputs and 2 outputs:
stream.func(%in) (!stream.stream<i64>) -> !stream.stream<i64> {
^0(%0 : i64, %1 : i64, %2 : i64):
    %r = arith.addi %0, %1 : i64
    %r2 = arith.addi %1, %2 : i64
    stream.yield %r, %r2 : i64, i64
}

or ones which can emit multiple streams?

%out:2 = stream.func(%in) (!stream.stream<i64>) -> (!stream.stream<i64>, !stream.stream<i64>) {
^0(%0 : i64, %1 : i64, %2 : i64):
    stream.yield (%r), (%r2) : i64, i64
}
  • What are your thoughts on passing streams around as arguments and return values of functions?

Thanks for the comments.

Regarding the max SSA form: As of yet, the operations are implicitly IsolateFromAbove, as referencing variables defined on the outside breaks lowering (I did not yet a verifiers for that). If we want to allow referencing other SSA values, capturing is for sure the way to go, as it will make lowering simpler. Note that it’s a bit difficult to lower this in general, as the semantics for such a variable are non-trivial:

  • Do we basically do a reduction with it, i.e., save it in a buffer and feed it back into the region (function) for each stream element?
  • What to do when a new input arrives? The stream will trigger send a new ctrl signal for each stream element, but what about the variable?

Would it make sense to have a stream operation that can pop an arbitrary amount of inputs and push an arbitrary amount of outputs.

This might be a good idea. I wanted to implement flatMap at some point which can emit multiple elements. Consuming multiple elements wasn’t considered yet, but windowing was.
As before, the semantics are very important, e.g., should the “multi-consumer” by execute for overlapping intervals or only for distinct ones?

or ones which can emit multiple streams?

This is something I’m planing to work on this week. There are two types of such operation, one that simply “forks” the steam to all it’s outputs and one that has a region which computes the ID of the output stream.

What are your thoughts on passing streams around as arguments and return values of functions?

This is not yet supported, but shouldn’t be too hard to implement. This will only require an additional pattern for func::CallOp, I hope.

This is not yet supported, but shouldn’t be too hard to implement. This will only require an additional pattern for func::CallOp , I hope.

The software semantics of this is well understood, but i suspect the hardware is very much non-trivial. Streams would need to be passed around by-reference and lowerings (or verifiers?) would need to decide whether single-reader-single-writer is the only thing that’s allowed, or if multiple reader-writer is allowed (with some form of arbitartion in lowering…).

1 Like

Currently, the lowerings simply transform a !stream.stream<T> to tuple<T, i1> where the i1 indicates the end-of-stream. Passing this to a handshake function/instance shouldn’t be difficult, or do I miss something?

The single-reader-single-writer topic is another design aspect I was already thinking about:

  • Having only one operation that writes into a stream is probably the easiest approach. If multiple streams have to be combined, we should do this explicitly with operations like merge (merge a set of elements), concat (combine finite streams), gather (Combine streams in a round-robin or fifo manner).
  • Multiple consumers can either be directly allowed, or we introduce some kind of a fork operation. The only thing that might be dangerous here, is back pressure, if one consumer stalls.
1 Like

One thing I’m still a bit confused about is the handling of ctrl signals.
As of yet, the lowering assumes that for each new element, a separate ctrl signal is emitted. Thus, each stream element is essentially a thread that runs through the circuit until it is either returned, or is terminated in an operation (filter, reduce).
Such a design implies that operations like split would produce multiple outgoing ctrl signals, while merge would expect multiple incoming ctrl signals. While I’m familiar with the handshake dialect, it’s not clear to me if this would break some of it’s invariants.

Note that one could even add an additional control-flow (with a separate ctrl signal) that would handle initialization of the streaming pipeline (memory, reading externally provided values, …).

The way I reason about the Handshake dialect is regarding the capabilities of the operators in a “peephole” sense - they have some function and some semantics for their in/out ready/valid handshaking (see this). It is then the abstraction that builds on top of it - for instance DHLS - which uses these operators in a way that ensures invariants of the abstraction, such as a program being deadlock-free (wrt. the CFG of the source program).

So looping back to this dialect, the task of designing the lowering (or rather, the hardware in general) is to:

  • make sure that the hardware design is deadlock free for stream programs in isolation.
  • make sure that the hardware design is deadlock free in interfacing with the DHLS abstraction.

One common pitfall is insufficient buffering on the output of merge-like operation. This can cause deadlocks when you have backedges in your control circuit, since a buffer is needed to break the control dependency between cycles.

Also, I think figures makes these things easier to reason about, so feel free to post those (if you have them) to illustrate the different hardware lowering schemes that you are considering.

2 Likes

As far as I can tell, the lowerings do not break the handshake dialect (except for missing task pipelining checks).

As I didn’t yet add multi-input operations, I didn’t look too much into buffer placements. There are some problems related to streams that get split, filtered, and merged again, but this is something the programmer has to take care off, I feel.

Also, I think figures makes these things easier to reason about, so feel free to post those (if you have them) to illustrate the different hardware lowering schemes that you are considering.

I do not yet have any figures for the lowering, but I will work on some as I need them either way for my thesis.

A potential lowering approach

As discussed, the lowering approach implemented breaks down for multiple stream in- and outputs. Therefore, I tried to design the following approach that only slightly differs from the one already implemented.

Types

Each stream is represented as a tuple with three elements: An element type, an EOS signal, and a ctrl signal.

Example:
!stream.stream<i64> becomes tuple<i64, i1, none>.

Packing these things into a tuple ensures that only one handshaking mechanism is emitted.

Operations with regions

Each operation holding a region expects a collection of input streams and produces a set of output streams. The operation is transformed to a new handshake.func which is instantiated where it was originally used.

If an operation has to read external values or initialize state (buffers, memories, etc.) it will only read them if an additional ctrl signal is fired.

Here is a figure that illustrates this idea.

And an example with a split operation:

%res0, %res1 = stream.split(%in) : (!stream.stream<tuple<i32, i32>>) -> (!stream.stream<i32>, !stream.stream<i32>) {
^0(%val: tuple<i32, i32>):
  %0, %1 = stream.unpack %val : tuple<i32, i32>
  stream.yield %0, %1 : i32, i32
}

which should be transformed to the following:

handshake.func private @stream_split(%arg0: tuple<tuple<i32, i32>, i1, none>, %arg1: none, ...) -> (tuple<i32, i1, none>, tuple<i32, i1, none>, none) {
   %0:3 = unpack %arg0 : tuple<tuple<i32, i32>, i1, none>
   %1:2 = fork [2] %0#1 : i1
   %2:2 = unpack %0#0 : tuple<i32, i32>
   %3:2 = fork [2] %0#2 : none
   %4 = pack %2#0, %1#1, %3#0 : tuple<i32, i1, none>
   %5 = pack %2#1, %1#0, %3#1 : tuple<i32, i1, none>
   return %4, %5, %arg1 : tuple<i32, i1, none>, tuple<i32, i1, none>, none
 }

Note that split has no state, and thus the ctrl signal is directly forwarded. In other cases, it might trigger some additional logic.

Open Questions / Limitations

  1. As already mentioned, it’s not clear if having such ctrl signals in tuples is supported by handshake. In general, the steam lowering, which is heavily task-pipelined circuits might bend some parts of the handshake dialect a bit.

  2. Having a none type in a tuple breaks the Handshake to FIRRTL lowering. This shouldn’t be too hard to fix, though. Edit: It’s not as simple as expected…

  3. How to handle the case where some streams are active, and the initialization signal is triggered? Forcing to fire reset beforehand might not be a great idea.

Again, my hardware design background is limited, so if others have concerns or improvements, I’m happy to hear them.

Just so i understand correctly, the inclusion of the none-typed value inside the tuple is to have an additional token within the stream that indicates that the stream carries a value, which then could be fed to stateful components?

Having a none type in a tuple breaks the Handshake to FIRRTL lowering. This shouldn’t be too hard to fix, though. Edit: It’s not as simple as expected…

The semantics of that are also a bit funky - in Handshake, all top-level SSA values have implicit ready/valid signals, and none-typed SSA values will therefore only be a ready/valid signal pair. So when we have a tuple, a ready/valid pair is associated with the tuple itself, and the tuple internals are the data values. As such, a none-typed value inside a tuple would literally lower to nothing.

It might be that we need to lessen the restriction that all handshake functions have exactly one mandatory control input/output. This is only really a requirement for the DHLS model of dataflow circuits, but your model might be better suited with a signature of:

handshake.func private @stream_split(%arg0: tuple<tuple<i32, i32>, i1>, %arg1: none, ...) -> (tuple<i32, i1>, none, tuple<i32, i1>, none)

That is, each stream will have a distinct control token alongside it. In DHLS, where we model CFGs, it makes sense to only have a single control token for functions, since that control token models which basic block the program currently resides in. However the streaming model has another notion of control.

2 Likes

Exactly.

The semantics of that are also a bit funky - in Handshake, all top-level SSA values have implicit ready/valid signals, and none -typed SSA values will therefore only be a ready/valid signal pair. So when we have a tuple, a ready/valid pair is associated with the tuple itself, and the tuple internals are the data values. As such, a none -typed value inside a tuple would literally lower to nothing.

I figured this out while investigating the cause of the crash. Indeed, providing a separate control token for each stream seems to be the way to go. I’ll try to adapt my current implementation to follow this approach.

It might be that we need to lessen the restriction that all handshake functions have exactly one mandatory control input/output.

I can file an issue and try to implement such a change. If there are any unforeseen complication, we can discuss this in the according issue.
Handshake does not enforce this property, so we can already use multiple control tokens, if we want to.

1 Like

I’m not very familiar with Handshake or FIRRTL, so I don’t know how this would affect interacting/lowering with those dialects.

I agree that a stream should only have one writer. That fits best with syntax and is pretty reasonable to understand.

I like the design choice of streams being single-reader. I think reading a stream is really more like consuming (with the current semantics where most operations modify the stream). Multiple consumers should be handled through forking/copying of some kind. That works well for some cases (e.g. when you know exactly which consumer to use for each stream element so you can easily filter), but multiple competing consumers may require more coordination (which I think should be an explicit choice of the user/coder of how to write this coordination). Maybe the stream could be consumed by one handler which then interfaces with the consumers as needed (possibly involving more Handshake logic).

My initial thoughts of how to compose streams would be to assume that each stream is SSA-like where passing a stream to a map then yields another stream which is logically distinct from the first stream (even if a hardware implementation may involve reusing the first stream somehow). If one were to reuse the first logical stream later, then the analysis passes would realize this and have to make a copy of the first stream (assuming the map operation on the first stream destroyed/consumed it).

I reason about them like this because I imagine that if a stream is like a bunch of data moving along a wire somewhere (or even something like an array passed to a function), then if some function or thing uses that stream of data without explicitly passing it along, then it’s consuming that stream and destroying it. Maybe it makes a new stream very similar to the old one, but the old one is gone unless we explicitly pass it along to somewhere else. Does that make sense?

Maybe I’m misunderstanding the assumptions/semantics of these streams, but what’s stopping us from making copies if we need to make multiple streams from one? For example, if you wanted to fork a stream and create two streams (with subsets of the elements of the first), then you could copy the first stream into two new streams then apply filters to both new streams. This is a software-style approach to it (e.g. make two copies of an array then remove elements from both), but I think it could be a solution for now to these problems.

One could introduce a copy or fork operation which takes in one input stream and outputs two or more (separate) streams identical to the input stream. Would this be useful? Is this different from the forking mechanism you mentioned?

1 Like

One could introduce a copy or fork operation which takes in one input stream and outputs two or more (separate) streams identical to the input stream. Would this be useful? Is this different from the forking mechanism you mentioned?

When I was mentioning multiple consumers, I assumed that each element would be provided to all consumers, which is basically an implicit fork. But I do agree that making explicit copies might be a better idea. Implementing such a forking operation should be straight forward, as it can be lowered directly to handshake.forks.

Having multiple consumers where each element is only forwarded to one operation might be something nice to have, but in this case an explicit distribute / scatter operation should be used beforehand. There are different ways of splitting a stream which one could define on the scatter operation.

1 Like

+1 to stream dialect. Just giving my two cents here.

Stream dialect sounds very interesting and fwiw I’d like if this dialect gets added to the mlir subproject instead of the circt project. Streams have different algorithms than normal bounded data structures in many cases and streams are not specific to hardware - data streams are extensively used in big data mining and having a core mlir dialect will be useful to any dsl targeting that (I’m not sure if there are many, but I think there must be some).

I’d like to add that I’m not familiar with circt however I have been playing around with mlir for quite sometime now. Also, I’d be interested in contributing to this dialect irrespective of whether it gets added to circt or mlir.

3 Likes

Stream dialect sounds very interesting and fwiw I’d like if this dialect gets added to the mlir subproject instead of the circt project.

I agree. Adding the dialect to MLIR would enable many more targets while preserving the possibility to work with it from CIRCT. I already discussed the possibility of targeting CPUs and/or GPUs from the streaming dialect with some other people, but until now we didn’t start to implement anything in this direction.
I’m not sure what would be required to land a new dialect in MLIR. While implementing lowerings to the lower-level MLIR dialects would be interesting to implement, I’m a bit on a tight schedule as this is part of a thesis project.

Should we maybe open a new RFC to discuss this topic with the people from the MLIR community as well? Going through the lowering related answers above might be distracting for the broader community.

Also, I’d be interested in contributing to this dialect irrespective of whether it gets added to circt or mlir.

That’s great to hear :smile:. Feel free to contact me to coordinate such a collaboration.

1 Like

I too don’t know about that. Of course, this can enter CIRCT right now and maybe later move to MLIR.

Yes, that works too. It would be interesting to see what the MLIR folks think of this dialect. As I mentioned above, we can delay that discussion too. It will be nice if this was upstreamed with a thought at the back of our minds - of possibly targeting general streaming DSLs in the future instead of limiting to just hardware. It seems you have already been thinking about targeting CPUs / GPUs, so that’s great :slight_smile:

Hi @Dinistro! Was catching up on my email and wanted to comment on this: it looks quite interesting. It seems like you’re thinking about this in a ‘functional programming’ sense, where streams can be created and the operations applied to the stream. This is a an interesting programming abstraction in it’s own right, which is similar to the handshake dialect, but has some important differences. In the Handshake dialect I usually think about processing an ‘infinite’ stream that never ends. This is a nice representation of hardware because it ‘exists’ forever. I imagine that mapping a program with a bunch of finite streams onto the handshake dialect as infinite streams with appropriate signalling could be a challenging problem!

One thing I’ve wanted to add to the handshake dialect is a way of having a coarse-grained process that is defined using a sequential region of code. e.g. something like:

func @mymux(!handshake.stream control, !handshake.stream in1, !handshake.stream in2) {
%1 = handshake.read(control)
if(%1) return handshake.read(in1)
else return handshake.read(in2)
}

Then we could replace the primitive

%0 = mux %select [%data0, %data1] : index, i32

with

%0 = handshake.process @mymux %select [%data0, %data1] : index, i32

This matches nicely with the hls_stream library in Vitis HLS/Vivado HLS. Have you thought about something like this for your stream dialect?

Steve

Thanks for the reply! My current approach can support both finite and infinite streams by having an “end-of-stream” signal which is fired once a stream is done. If this isn’t fired at any point in time, then the pipeline shouldn’t have any issues. Some operators, e.g., reduce only make sense on finite streams, as they accumulate over the full stream. Having operations that cut out finite substreams from a infinite input are thus worth considering as well.

One thing I’ve wanted to add to the handshake dialect is a way of having a coarse-grained process that is defined using a sequential region of code.

I didn’t really think about handshake extensions as of yet as most higher-level constructs can somehow be represent in handshake.

The example you show can be represented with a element-wise merge, followed by a filtering operation. The main difference to Vitis’ hls_stream library is that the dialect has no way of explicitly reading from or writing to a stream. Instead, we try to model higher-level streaming concepts and make a deliberate step away from a library design in favor of a DSL like approach. I hope that enforcing a certain structure of the input language simplifies optimizations and lowerings.