ExecuteGroovyScript 2.0.0

Bundle
org.apache.nifi | nifi-groovyx-nar
Description
Experimental Extended Groovy script processor. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.
Tags
groovy, groovyx, script
Input Requirement
ALLOWED
Supports Sensitive Dynamic Properties
false
  • Additional Details for ExecuteGroovyScript 2.0.0

    Groovy

    Summary

    This is the grooviest groovy script :)

    Script Bindings:

    variable type description
    session org.apache.nifi.processor.ProcessSession the session that is used to get, change, and transfer input files
    context org.apache.nifi.processor.ProcessContext the context (almost unuseful)
    log org.apache.nifi.logging.ComponentLog the logger for this processor instance
    REL_SUCCESS org.apache.nifi.processor.Relationship the success relationship
    REL_FAILURE org.apache.nifi.processor.Relationship the failure relationship
    CTL java.util.HashMap<String,ControllerService> Map populated with controller services defined with `CTL.*` processor properties.
    The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code.
    SQL java.util.HashMap<String, groovy.sql.Sql> Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties.
    The `SQL.` prefixed properties could be linked only to DBCPSercice.
    RecordReader java.util.HashMap<String,RecordReaderFactory> Map populated with controller services defined with `RecordReader.*` processor properties.
    The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.
    RecordWriter java.util.HashMap<String,RecordSetWriterFactory> Map populated with controller services defined with `RecordWriter.*` processor properties.
    The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.
    Dynamic processor properties org.apache.nifi.components.PropertyDescriptor All processor properties not started with `CTL.` or `SQL.` are bound to script variables

    SQL map details

    Example: if you defined property `SQL.mydb` and linked it to any DBCPService, then you can access it from code SQL.mydb.rows('select * from mytable')

    The processor automatically takes connection from dbcp service before executing script and tries to handle transaction:
    database transactions automatically rolled back on script exception and committed on success.
    Or you can manage transaction manually.
    NOTE: Script must not disconnect connection.

    SessionFile - flow file extension

    The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.
    This flow file is a container that references session and the real flow file.
    This allows to use simplified syntax to work with file attributes and content:

    set new attribute value

    flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE
    flowFile.'mime.type' = 'text/xml'
    flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
    // the same as
    flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
    

    remove attribute

    flowFile.ATTRIBUTE_NAME = null
    // equals to
    flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")
    

    get attribute value

    String a = flowFile.ATTRIBUTE_NAME
    

    write content

    flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content")
    flowFile.write("UTF-8") { writer ->
        // do something with java.io.Writer...
    }
    flowFile.write { outStream ->
        // do something with output stream...
    }
    flowFile.write { inStream, outStream ->
        // do something with input and output streams...
    }
    

    get content

    InputStream i = flowFile.read()
    def json = new groovy.json.JsonSlurper().parse(flowFile.read())
    String text = flowFile.read().getText("UTF-8")
    

    transfer flow file to success relation

    REL_SUCCESS << flowFile
    flowFile.transfer(REL_SUCCESS)
    // the same as:
    session.transfer(flowFile, REL_SUCCESS)
    

    work with dbcp

    import groovy.sql.Sql
    
    // define property named `SQL.db` connected to a DBCPConnectionPool controller service
    // for this case it's an H2 database example
    
    // read value from the database with prepared statement
    // and assign into flowfile attribute `db.yesterday`
    def daysAdd = -1
    def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
    flowFile.'db.yesterday' = row.DB_DATE
    
    // to work with BLOBs and CLOBs in the database
    // use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader)
    
    // write content of the flow file into database blob
    flowFile.read { rawIn ->
        def parms = [
                p_id  : flowFile.ID as Long, // get flow file attribute named \`ID\`
                p_data: Sql.BLOB(rawIn),   // use input stream as BLOB sql parameter
        ]
        SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
    }
    

    Handling processor start & stop

    In the extended groovy processor you can catch `start` and `stop` and `unscheduled` events by providing corresponding static methods:

    import org.apache.nifi.processor.ProcessContext
    import java.util.concurrent.atomic.AtomicLong
    
    class Const {
        static Date startTime = null;
        static AtomicLong triggerCount = null;
    }
    
    static onStart(ProcessContext context) {
        Const.startTime = new Date()
        Const.triggerCount = new AtomicLong(0)
        println "onStart $context ${Const.startTime}"
    }
    
    static onStop(ProcessContext context) {
        def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
        println "onStop $context executed ${Const.triggerCount} times during ${alive} seconds"
    }
    
    static onUnscheduled(ProcessContext context) {
        def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
        println "onUnscheduled $context executed ${Const.triggerCount} times during ${alive} seconds"
    }
    
    flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
    REL_SUCCESS << flowFile
    
Properties
Dynamic Properties
Restrictions
Required Permission Explanation
execute code Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.
Relationships
Name Description
success FlowFiles that were successfully processed
failure FlowFiles that failed to be processed
See Also