Skip to content

Week 8a - Pipeline Design Pattern

The Pipeline Design Pattern

Classification

  • Structural Design Pattern: Patterns that ease the design by identifying a simple way to realize relationships among entities.

Pattern Definition

Pipeline Design Pattern

  • Hard Definition: A linear, one-directional flow of payloads between a chain of processing elements arranged so that the output of each element is the input of the next.
  • Easy Definition: A chain of Stages where the output of each Stage is the input to the next modifying the payload object at each stage.

Representations

TextBook Pipeline

  • StageInterface.apply(Payload) defines the unit of work
  • Pipeline chains stages so each one’s output feeds the next.
  • I partner mine with:
    • Strategy: ProcessorInterface/ProcessorType - Allows pipeline to swap out the processor allowing me to change execution (i.e. sequential, parallel, interrupt, retry, logged, etc.)
    • Composite: Since PipelineInterface extends StageInterface a Pipeline is ALSO a Stage, allowing composite pipeline relationships and even parallel pipelines.

Mermaid Graph - Pipeline Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
classDiagram
    PipelineExClient --> PipelineInterface
    StageInterface <|-- PipelineInterface : extends
    PipelineInterface <|-- Pipeline : implements
    ProcessorInterface <|-- ProcessorType : implements
    StageInterface <|-- Stage1 : implements
    StageInterface <|-- Stage2 : implements
    StageInterface --* Pipeline : Composition
    ProcessorInterface --* Pipeline : Composition
    Pipeline ..> Payload : Dependency
    ProcessorType ..> Payload : Dependency
    Stage1 ..> Payload : Dependency
    Stage2 ..> Payload : Dependency
    class StageInterface {
        &lt;&lt;interface&gt;&gt;
        +apply(Payload payload) Payload
    }
    class Stage1 {
        +apply(Payload payload) Payload
    }
    class Stage2 {
        +apply(Payload payload) Payload
    }    
    class PipelineInterface {
        &lt;&lt;interface&gt;&gt;
        +pipe(StageInterface stage) PipelineInterface
        +process(Payload payload) : Payload
    }
    class Pipeline {
        -List<StageInterface> stages
        -ProcessorInterface processor
        +Pipeline(ProcessorInterface processor, List<StageInterface> stages)
        +pipe(StageInterface stage) PipelineInterface
        +process(Payload payload) : Payload
        +apply(Payload payload) Payload
    }
    class ProcessorInterface {
        &lt;&lt;interface&gt;&gt;
        +process(Payload payload, List<StageInterface> stages) : Payload
    }  
    class ProcessorType {
        +process(Payload payload, List<StageInterface> stages) : Payload
    }
    class Payload {
        +Payload(Class<T> desiredType, Object result)
        +getResult(Class<T> desiredType) : <T> T
        +setResult(Class<T> desiredType, Object result)
    }
    class PipelineExClient {
    }

Mermaid Graph - Pipeline Pattern with Builder Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
classDiagram
    PipelineExClient --> PipelineInterface
    StageInterface <|-- PipelineInterface : extends
    PipelineInterface <|-- Pipeline : implements
    ProcessorInterface <|-- ProcessorType : implements
    StageInterface <|-- Stage1 : implements
    StageInterface <|-- Stage2 : implements
    StageInterface --* Pipeline : Composition
    ProcessorInterface --* Pipeline : Composition
    Pipeline ..> Payload : Dependency
    ProcessorType ..> Payload : Dependency
    Stage1 ..> Payload : Dependency
    Stage2 ..> Payload : Dependency
    PipelineBuilderInterface <|-- PipelineBuilder : implements
    StageInterface --* PipelineBuilder : Composition
    PipelineBuilder ..> Pipeline
    class StageInterface {
        &lt;&lt;interface&gt;&gt;
        +apply(Payload payload) Payload
    }
    class Stage1 {
        +apply(Payload payload) Payload
    }
    class Stage2 {
        +apply(Payload payload) Payload
    }    
    class PipelineInterface {
        &lt;&lt;interface&gt;&gt;
        +pipe(StageInterface stage) PipelineInterface
        +process(Payload payload) : Payload
    }
    class Pipeline {
        -List<StageInterface> stages
        -ProcessorInterface processor
        +Pipeline(ProcessorInterface processor, List<StageInterface> stages)
        +pipe(StageInterface stage) PipelineInterface
        +process(Payload payload) : Payload
        +apply(Payload payload) Payload
    }
    class ProcessorInterface {
        &lt;&lt;interface&gt;&gt;
        +process(Payload payload, List<StageInterface> stages) : Payload
    }  
    class ProcessorType {
        +process(Payload payload, List<StageInterface> stages) : Payload
    }
    class Payload {
        +Payload(Class<T> desiredType, Object result)
        +getResult(Class<T> desiredType) : <T> T
        +setResult(Class<T> desiredType, Object result)
    }
    class PipelineExClient {
    }
    class PipelineBuilderInterface {
        &lt;&lt;interface&gt;&gt;
        +add(StageInterface stage) : PipelineBuilderInterface
        +build(ProcessorInterface) : PipelineInterface
    }
    class PipelineBuilder {
        +add(StageInterface stage) : PipelineBuilderInterface
        +build(ProcessorInterface) : PipelineInterface
    }

UML

Pipeline Pattern
Pipeline Pattern with Builder Pattern

Real World Usages

Pipeline Pattern

  • UNIX/Linux/Windows shells: Think command pipes!
  • Filter Chains
  • Data processing pipelines like Talend and SSIS
  • Render farms for video processing.
  • Map/Reduce is an fine-tuned application of this pattern.

Ideas for Use

  • Making API requests and parsing the result (JSON, XML, etc).
  • Conversion tools translating CSV | JSON | SQL Query, etc.
  • MP3 channel | MIDI | Device channel
  • Host Scan (Result) | Vulnerability Map | Pen Test
  • DB Query | JSON | Message Q | API Job
  • Port Scan Job | Parallel Scan | Aggregator

Java Code Example

Main Take-Aways from ME

  • Think of the Pipeline Pattern like an assembly line that takes a payload object, modifies it, and passes it onto the next Stage.
    • The Payload object is the one primary constant in all Stages.
    • The payload is passed to each Stage of the Pipeline process until it complete or errors.
  • Stages could also be called Filters in the wild.
  • The Pipeline pattern increases readability.
Consider a bunch of nested conditions...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public Order processOrder(Order order) {
    Order result = null;
    try {
        result = createOrder(order);
        if (result != null && "CREATED".equals(result.getStatus())) {
            try {
                result = processPayment(result);
                if (result != null && "PAID".equals(result.getStatus())) {
                    try {
                        result = sendInvoice(result);
                        if (result != null && "INVOICED".equals(result.getStatus())) {
                            try {
                                result = exportOrder(result);
                                if (result != null && "EXPORTED".equals(result.getStatus())) {
                                    log.info("Order {} processed successfully", order.getId());
                                } else {
                                    log.error("Export failed for order {}", order.getId());
                                }
                            } catch (ExportException e) {
                                log.error("Export error for order {}: {}", order.getId(), e.getMessage());
                            }
                        } else {
                            log.error("Invoice failed for order {}", order.getId());
                        }
                    } catch (InvoiceException e) {
                        log.error("Invoice error for order {}: {}", order.getId(), e.getMessage());
                    }
                } else {
                    log.error("Payment failed for order {}", order.getId());
                }
            } catch (PaymentException e) {
                log.error("Payment error for order {}: {}", order.getId(), e.getMessage());
            }
        } else {
            log.error("Create failed for order {}", order.getId());
        }
    } catch (CreateOrderException e) {
        log.error("Create error for order {}: {}", order.getId(), e.getMessage());
    }
    return result;
}
Versus
1
2
3
4
5
6
PipelineInterface pipeline = new Pipeline()
        .pipe(new CreateOrder())
        .pipe(new ProcessPayment())
        .pipe(new SendInvoice())
        .pipe(new ExportOrder());
pipeline.process(order);
  • The Pipeline pattern increases testability
  • The Pipeline patterns allow Pipeline Re-Use (By making Pipeline a Composite of Stage)
  • Handling Exceptions is trivial
  • Single responsibility per stage, making code re-use and modularity simpler (Some people might consider this Strategy).
  • Adding a Builder Pattern allows for conditional Pipelines.
  • Use this code when solving multi-stage problems or filtering.

Main OOP Principles of Pipeline Pattern

  • Single Responsibility: Since each Stage does one and only one specific thing, this adheres to the rule of Single Responsibility in the strictest sense.
  • Favors composition over inheritance: The Pipeline is composed of its Stage list and Pipeline processor rather than inheriting from them.
  • Program to interfaces not Implementations: All of the concrete classes are dependent on implementing interfaces instead of inheritance. In addition, because the Pipeline class is also a StageInterface, built pipelines can be handed as Stages to other pipelines as re-usable pipelines.