#//////////////////////////////////////////////////////////// #// imports go here #//////////////////////////////////////////////////////////// from org.apache.nifi.processor import Processor,Relationship from java.lang import Throwable
classJythonProcessor(Processor): REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build() REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build() log = None e = E() definitialize(self,context): self.log = context.logger defgetRelationships(self): returnset([self.REL_SUCCESS, self.REL_FAILURE]) defvalidate(self,context): pass defonPropertyModified(self,descriptor, oldValue, newValue): pass defgetPropertyDescriptors(self): return [] defgetIdentifier(self): returnNone defonTrigger(self,context, sessionFactory): session = sessionFactory.createSession() try: self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE) session.commit() except Throwable, t: self.log.error('{} failed to process due to {}; rolling back session', [self, t]) session.rollback(true) raise t #end class
def REL_SUCCESS = new Relationship.Builder() .name('success') .description('The flow file with the specified content and/or filename was successfully transferred') .build();
def CONTENT = new PropertyDescriptor.Builder() .name('File Content').description('The content for the generated flow file') .required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build() def CONTENT_HAS_EL = new PropertyDescriptor.Builder() .name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content') .required(true).allowableValues('true','false').defaultValue('false').build() def FILENAME = new PropertyDescriptor.Builder() .name('Filename').description('The name of the flow file to be stored in the filename attribute') .required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() @Override void initialize(ProcessorInitializationContext context) { }
@Override Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }
// Properties staticfinal PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() .name("record-reader") .displayName("Record Reader") .description("Specifies the Controller Service to use for reading incoming data") .identifiesControllerService(RecordReaderFactory.class) .required(true) .build() staticfinal PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() .name("record-writer") .displayName("Record Writer") .description("Specifies the Controller Service to use for writing out the records") .identifiesControllerService(RecordSetWriterFactory.class) .required(true) .build()
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build() def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()
def readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory) def writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory) final Map<String, String> attributes = new HashMap<>() final AtomicInteger recordCount = new AtomicInteger() final FlowFile original = flowFile final Map<String, String> originalAttributes = flowFile.attributes try { flowFile = session.write(flowFile, { inStream, outStream -> def reader = readerFactory.createRecordReader(originalAttributes, inStream, getLogger()) try {
// Get the first record and process it before we create the Record Writer. // We do this so that if the Processor updates the Record's schema, we can provide // an updated schema to the Record Writer. If there are no records, then we can // simply create the Writer with the Reader's schema and begin & end the RecordSet def firstRecord = reader.nextRecord() if (!firstRecord) { def writeSchema = writerFactory.getSchema(originalAttributes, reader.schema) def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream) try { writer.beginRecordSet() def writeResult = writer.finishRecordSet() attributes['record.count'] = String.valueOf(writeResult.recordCount) attributes[CoreAttributes.MIME_TYPE.key()] = writer.mimeType attributes.putAll(writeResult.attributes) recordCount.set(writeResult.recordCount) } finally { writer.close() } return }
///////////////////////////////////////// // TODO process first record /////////////////////////////////////////
def writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.schema) def writer = writerFactory.createWriter(getLogger(), writeSchema, outStream) try { writer.beginRecordSet() writer.write(firstRecord) def record while (record = reader.nextRecord()) { ////////////////////////////////////////// // TODO process next record ////////////////////////////////////////// writer.write(processed) }
def writeResult = writer.finishRecordSet() attributes.put('record.count', String.valueOf(writeResult.recordCount)) attributes.put(CoreAttributes.MIME_TYPE.key(), writer.mimeType) attributes.putAll(writeResult.attributes) recordCount.set(writeResult.recordCount) } finally { writer.close() } } catch (final SchemaNotFoundException e) { thrownew ProcessException(e.localizedMessage, e) } catch (final MalformedRecordException e) { thrownew ProcessException('Could not parse incoming data', e) } finally { reader.close() } } as StreamCallback) } catch (final Exception e) { getLogger().error('Failed to process {}; will route to failure', [flowFile, e] as Object[]) session.transfer(flowFile, REL_FAILURE); return; } flowFile = session.putAllAttributes(flowFile, attributes) recordCount.get() ? session.transfer(flowFile, REL_SUCCESS) : session.remove(flowFile) def count = recordCount.get() session.adjustCounter('Records Processed', count, false) getLogger().info('Successfully converted {} records for {}', [count, flowFile] as Object[]) } }