Mule Enum in Flow – issue with enum of same name as variable

When creating a flow in Mule recently, I hit an issue that kept me scratching my head for a number of hours, so thought I’d share it…

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:servlet="http://www.mulesoft.org/schema/mule/servlet" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.5.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/servlet http://www.mulesoft.org/schema/mule/servlet/current/mule-servlet.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <flow name="input-data" doc:name="input-data">
        <composite-source doc:name="Composite Source">
            <vm:inbound-endpoint exchange-pattern="one-way" path="pvoutput-data" doc:name="vm-pvoutput-data"/>
            <servlet:inbound-endpoint path="/pvoutput-data" responseTimeout="10000" doc:name="servlet-pvoutput-data"/>
        </composite-source>
        <set-variable variableName="Received" value="#[new java.util.GregorianCalendar()]" doc:name="Received Date Time"/>
        <set-variable variableName="DataSource" value="#[DataSource.valueOf(message.inboundProperties['X-Data-Source'])]" doc:name="Source"/>
        <http:body-to-parameter-map-transformer doc:name="Body to Parameter Map"/>
        <logger message="#[message]" level="DEBUG" doc:name="Logger"/>
        <logger message="Received #[DataSource] Data: #[payload]" level="INFO" doc:name="Logger"/>
        <choice doc:name="Source Choice">
            <when expression="#[DataSource == DataSource.HOT_WATER]">
                <expression-transformer doc:name="Hot Water" expression="#[new HotWaterData(Received, payload);]" />
            </when>
            <when expression="#[DataSource == DataSource.WEATHER]">
                <expression-transformer doc:name="Weather" expression="#[new WeatherData(Received, payload);]" />
            </when>
            <when expression="#[DataSource == DataSource.INVERTER]">
                <expression-transformer doc:name="Inverter" expression="#[new GenerationData(Received, payload);]" />
            </when>
            <when expression="#[DataSource == DataSource.METER]">
                <expression-transformer doc:name="Meter" expression="#[new MeterData(Received, payload);]" />
            </when>
            <otherwise>
                <logger message="Unexpected Data Source Type: #[DataSource]" level="ERROR" doc:name="Logger"/>
            </otherwise>
        </choice>
        <vm:outbound-endpoint exchange-pattern="one-way" doc:name="combine-data" path="combine-data"/>
    </flow>
</mule>

(or in pictorial format)
aggregator

I’ve created a simple flow which receives a request on a servlet endpoint and then adds a variable depending on the source. The variable is named “DataSource”, but I also have an enum named DataSource which contains the following values:

package uk.co.vsf.aggregator.domain;

public enum DataSource {
	HOT_WATER,
	WEATHER,
	INVERTER,
	METER
	;
}

There’s also a Mule configuration block which has imports to a number of classes including the DataSource enum.

<configuration doc:name="Configuration">
		<expression-language autoResolveVariables="true">
			<import class="uk.co.vsf.aggregator.domain.DataSource"/>
			<import class="uk.co.vsf.aggregator.domain.HotWaterData"/>
			<import class="uk.co.vsf.aggregator.domain.WeatherData"/>
			<import class="uk.co.vsf.aggregator.domain.GenerationData"/>
			<import class="uk.co.vsf.aggregator.domain.MeterData"/>
			<import class="uk.co.vsf.aggregator.pvoutput.domain.AddStatusRequest"/>
			<import class="uk.co.vsf.aggregator.CalendarUtils"/>
			<import class="java.util.Calendar"/>
			<import class="java.util.GregorianCalendar"/>
		</expression-language>
	</configuration>

I’ve then written a simple test which calls the composite input vm with a set of values and a data source of Hot_Water. You’ll notice from the flow that after determining the data source, it logs out the message, so we can see the stored variable:

Message properties:
  INVOCATION scoped properties:
    DataSource=HOT_WATER
...

All ok at the moment…
Just below that log statement is another which prints out the DataSource, but this time we get:

Received class uk.co.vsf.aggregator.domain.DataSource Data: {t=40.56, i=10}

Not what I’d expected!
And after that log statement the data source is used to determine the route of the message being processed. I expect it to get transformed into a HotWaterData object, but instead the message goes to the else statement:

Unexpected Data Source Type: class uk.co.vsf.aggregator.domain.DataSource

Well… after much head scratching… it was as simple as renaming the variable to something other than the same name as the enum value! (DOH)

Now I get the results I’d expected:

Received HOT_WATER Data: {t=40.56, i=10}

And the returned object is an instance of HotWaterData.

Hope this helps someone else!

Home Monitoring (home made) – Java Code

There are only 6 java classes, 1 properties file and the flow (XML) file making up the Mule application. Some of them can be better written and should be unit tested, but they work and do the job :-))

ConsumptionData.java

package uk.co.vsf.pvoutputaggregator.domain;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Map;

public class ConsumptionData extends DefaultReadingData {

	private BigDecimal milliSecondsInHour = new BigDecimal("3600000");

	private BigDecimal importWatts;
	private BigDecimal exportWatts;
	private BigDecimal msBetweenCalls;

	public ConsumptionData(@SuppressWarnings("rawtypes") Map data) {
		super((String) data.get("d"), (String) data.get("t"));
		importWatts = new BigDecimal(((String) data.get("v4")).trim());
		exportWatts = new BigDecimal(((String) data.get("v9")).trim());
		msBetweenCalls = new BigDecimal(((String) data.get("msBetweenCalls")).trim());
	}

	public BigDecimal getImportWattHours() {
		return importWatts.divide(getWattHoursDivisor(), 0, RoundingMode.HALF_EVEN);
	}

	private BigDecimal getWattHoursDivisor() {
		return msBetweenCalls.divide(milliSecondsInHour, 5, RoundingMode.HALF_EVEN);
	}

	public BigDecimal getExportWattHours() {
		if (exportWatts.compareTo(BigDecimal.ZERO) == 0) {
			return BigDecimal.ZERO;
		}
		return exportWatts.divide(getWattHoursDivisor(), 0, RoundingMode.HALF_EVEN);
	}

}

DefaultReadingData.java

package uk.co.vsf.pvoutputaggregator.domain;


class DefaultReadingData {

	private String date;
	private String time;
	
	public DefaultReadingData(String date, String time) {
		this.date = date;
		this.time = time;
	}
	
	public String getDateTime()
	{
		return "d=" + date + "&t=" + time;
	}

	public String getDate() {
		return date;
	}

	public String getTime() {
		return time;
	}
}

HotWaterData.java

package uk.co.vsf.pvoutputaggregator.domain;

import java.util.Map;

public class HotWaterData extends DefaultReadingData {

	private String hotWaterTemperature;
	private String immersionOn;

	public HotWaterData(@SuppressWarnings("rawtypes") Map data) {
		super((String) data.get("d"), (String) data.get("t"));
		hotWaterTemperature = (String) data.get("v7");
		immersionOn = (String) data.get("v8");
	}

	public String getHotWaterTemperature() {
		return hotWaterTemperature;
	}

	public String getImmersionOn() {
		return immersionOn;
	}
}

ConsumptionInputDataTransformer.java

package uk.co.vsf.pvoutputaggregator.transformer;

import java.util.Map;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;
import org.mule.transformer.AbstractTransformer;

import uk.co.vsf.pvoutputaggregator.domain.ConsumptionData;
import uk.co.vsf.pvoutputaggregator.domain.HotWaterData;

public class ConsumptionInputDataTransformer extends AbstractMessageTransformer {

	@Override
	public Object transformMessage(MuleMessage message, String outputEncoding)
			throws TransformerException {
		@SuppressWarnings("rawtypes")
		Map data = (Map) message.getPayload();
		ConsumptionData hotWaterData = new ConsumptionData(data);
		message.setCorrelationId(hotWaterData.getDateTime());
		message.setPayload(hotWaterData);
		return message;
	}

}

ExtraDataOutputTransformer.java

package uk.co.vsf.pvoutputaggregator.transformer;

import java.util.List;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

import uk.co.vsf.pvoutputaggregator.domain.ConsumptionData;
import uk.co.vsf.pvoutputaggregator.domain.HotWaterData;

public class ExtraDataOutputTransformer extends AbstractMessageTransformer {

	@SuppressWarnings("rawtypes")
	@Override
	public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException {
		StringBuffer pvoutputPostData = new StringBuffer(message.getCorrelationId());

		if (message.getPayload() instanceof List) {
			handleAggregatedData(pvoutputPostData, (List) message.getPayload());
		} else if (message.getPayload() instanceof ConsumptionData) {
			addConsumptionData(pvoutputPostData, (ConsumptionData) message.getPayload());
		} else {
			throw new UnsupportedOperationException();
		}

		message.setPayload(pvoutputPostData.toString());
		return message;
	}

	private void handleAggregatedData(StringBuffer pvoutputPostData, @SuppressWarnings("rawtypes") List payloadData) {
		for (Object instance : payloadData) {
			if (instance instanceof HotWaterData) {
				addHotWaterData(pvoutputPostData, (HotWaterData) instance);
			} else if (instance instanceof ConsumptionData) {
				addConsumptionData(pvoutputPostData, (ConsumptionData) instance);
			}
		}
	}

	// private void addConsumptionData(StringBuffer stringBuffer,
	// ConsumptionData consumptionData) {
	// stringBuffer.append("&v4=" +
	// consumptionData.getImportWattHours().toPlainString());
	// stringBuffer.append("&v9=" +
	// consumptionData.getExportWattHours().toPlainString());
	// }

	private void addConsumptionData(StringBuffer stringBuffer, ConsumptionData consumptionData) {
		stringBuffer.append("&v4=" + consumptionData.getImportWattHours().toPlainString());
		stringBuffer.append("&v2=" + consumptionData.getExportWattHours().toPlainString());
		stringBuffer.append("&n=1");
	}

	private void addHotWaterData(StringBuffer stringBuffer, HotWaterData hotWaterData) {
		stringBuffer.append("&v11=" + hotWaterData.getHotWaterTemperature());
		stringBuffer.append("&v10=" + hotWaterData.getImmersionOn());
	}
}

HotWaterInputDataTransformer.java

package uk.co.vsf.pvoutputaggregator.transformer;

import java.util.Map;

import org.mule.api.MuleMessage;
import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractMessageTransformer;

import uk.co.vsf.pvoutputaggregator.domain.HotWaterData;

public class HotWaterInputDataTransformer extends AbstractMessageTransformer {

	@Override
	public Object transformMessage(MuleMessage message, String outputEncoding)
			throws TransformerException {
		@SuppressWarnings("rawtypes")
		Map data = (Map) message.getPayload();
		HotWaterData hotWaterData = new HotWaterData(data);
		message.setCorrelationId(hotWaterData.getDateTime());
		message.setPayload(hotWaterData);
		return message;
	}

}

pvoutputaggregator.properties

correlation.aggregator.timeout=180000

http.endpoint.hotwater.port=21010
http.endpoint.hotwater.host=localhost
http.endpoint.hotwater.path=pvoutput-post-hotwater

http.endpoint.consumption.port=22010
http.endpoint.consumption.host=localhost
http.endpoint.consumption.path=pvoutput-post-consumption

X-Pvoutput-Apikey=REPLACE THIS
X-Pvoutput-SystemId=REPLACE THIS

pvoutput.endpoint=http://pvoutput.org/service/r2/addstatus.jsp
pvoutput.sentDataCopy.path=/pvoutputaggregator

pvoutputaggregator.mflow

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:context="http://www.springframework.org/schema/context" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.4.1"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd">
    <vm:endpoint exchange-pattern="one-way" path="ExtraDataQ" name="ExtraDataVmEndpoint" doc:name="VM"/>
    <http:connector name="GenericHttpConnector" cookieSpec="netscape" validateConnections="true" sendBufferSize="0" receiveBufferSize="0" receiveBacklog="0" clientSoTimeout="10000" serverSoTimeout="10000" socketSoLinger="0" doc:name="HTTP\HTTPS"/>
    <http:connector name="PvOutputHttpConnector" cookieSpec="netscape" validateConnections="true" sendBufferSize="0" receiveBufferSize="0" receiveBacklog="0" clientSoTimeout="10000" serverSoTimeout="10000" socketSoLinger="0" doc:name="HTTP\HTTPS"/>
    <custom-transformer class="uk.co.vsf.pvoutputaggregator.transformer.ConsumptionInputDataTransformer" name="Java" doc:name="Java"/>
    <context:property-placeholder location="classpath:/pvoutputaggregator.properties"/>
    <custom-transformer class="uk.co.vsf.pvoutputaggregator.transformer.HotWaterInputDataTransformer" name="HotWaterJava" doc:name="Java"/>
    <vm:endpoint exchange-pattern="one-way" path="PvOutputQ" name="PvOutputVmEndpoint" doc:name="VM"/>
    <file:connector name="FileConnector"  outputAppend="true" validateConnections="true" doc:name="File" autoDelete="true" streaming="true" />
    <file:endpoint path="${pvoutput.sentDataCopy.path}" outputPattern="#[function:datestamp:yyyy-MM-dd].txt" name="FileOutboundEndpoint" responseTimeout="10000" connector-ref="FileConnector" doc:name="File"/>
    <flow name="ConsumptionFlow" doc:name="ConsumptionFlow">
        <http:inbound-endpoint exchange-pattern="one-way"   doc:name="HTTP" host="${http.endpoint.consumption.host}" path="${http.endpoint.consumption.path}" port="${http.endpoint.consumption.port}" connector-ref="GenericHttpConnector"/>
        <http:body-to-parameter-map-transformer doc:name="Body to Parameter Map"/>
        <message-properties-transformer doc:name="Message Properties">
            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
        </message-properties-transformer>
        <transformer ref="Java" doc:name="Transformer Reference"/>
        <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="ExtraDataVmEndpoint"/>
    </flow>
    <flow name="HotWaterFlow" doc:name="HotWaterFlow">
        <http:inbound-endpoint exchange-pattern="one-way"   doc:name="HTTP"  path="${http.endpoint.hotwater.path}" host="${http.endpoint.hotwater.host}" port="${http.endpoint.hotwater.port}" connector-ref="GenericHttpConnector"/>
        <http:body-to-parameter-map-transformer doc:name="Body to Parameter Map"/>
        <message-properties-transformer doc:name="Message Properties">
            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2"/>
        </message-properties-transformer>
        <transformer ref="HotWaterJava" doc:name="Transformer Reference"/>
        <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="ExtraDataVmEndpoint"/>
    </flow>
    <flow name="ExtraDataToPvOutputFlow" doc:name="ExtraDataToPvOutputFlow">
        <vm:inbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="ExtraDataVmEndpoint"/>
        <collection-aggregator timeout="${correlation.aggregator.timeout}" failOnTimeout="true" doc:name="Collection Aggregator"/>
        <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="PvOutputVmEndpoint"/>
        <catch-exception-strategy doc:name="Catch Exception Strategy">
            <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="PvOutputVmEndpoint"/>
        </catch-exception-strategy>
    </flow>
    <flow name="PVOutputFlow" doc:name="PVOutputFlow">
        <vm:inbound-endpoint exchange-pattern="one-way"  doc:name="VM" ref="PvOutputVmEndpoint"/>
        <custom-transformer class="uk.co.vsf.pvoutputaggregator.transformer.ExtraDataOutputTransformer" doc:name="Java"/>
        <message-properties-transformer doc:name="Message Properties">
            <add-message-property key="X-Pvoutput-Apikey" value="${X-Pvoutput-Apikey}"/>
            <add-message-property key="X-Pvoutput-SystemId" value="${X-Pvoutput-SystemId}"/>
        </message-properties-transformer>
        <all doc:name="All">
            <processor-chain>
                <append-string-transformer message="&#xD;&#xA;" doc:name="Append String"/>
                <file:outbound-endpoint responseTimeout="10000" connector-ref="FileConnector" ref="FileOutboundEndpoint" doc:name="File"/>
            </processor-chain>
            <http:outbound-endpoint exchange-pattern="request-response" method="POST" address="${pvoutput.endpoint}" connector-ref="PvOutputHttpConnector" contentType="application/x-www-form-urlencoded" doc:name="HTTP"/>
        </all>
    </flow>
</mule>

Next Part

Part 7: And finally the results of all work!

Home Monitoring (home made) – Aggregation

Aggregation of the data before uploading to PVOutput

Why do we need to aggregate the data?

The data collected by the meter reading Arduino will be sent to PVOutput on the addstatus service using the standard parameters. The hot water Arduino will be sending the data on the extended parameters (only available to donors), but you can only use the extended parameters if a v1, 2, 3 or 4 is also transmitted in the same request. Both of the Arduino’s transmit their readings at different times so in order to be able to use the extended parameters on PVOutput, the data needs to be aggregated.

The scripts and programs for aggregation

Both Arduino’s transmit their data initially to my server (called Pompeii in case you hadn’t noticed from the code). Pompeii then aggregates the data before calling PVOutput with the meter readings and additional data.

The first scripts in the chain are php scripts, simply because they were there first – however the new aggregation service can easily replace the php scripts. Their job is to add the date and time to the data received and re-post to Pompeii.

pvoutput-post.php

<?php
$dte = date('Ymd');
$tme = date('H:i');

$url = 'http://localhost:22010/pvoutput-post-consumption';

$import = $_POST["iw"];
$export = $_POST["ew"];
$msBetweenCalls = $_POST["msBetweenCalls"];

$myvars = 'd=' . $dte . '&t=' . $tme . '&v4=' . $import . '&v9=' . $export . '&msBetweenCalls=' . $msBetweenCalls;

//echo "Post Data: " . $myvars;

$ch = curl_init( $url );
curl_setopt( $ch, CURLOPT_POST, 1);
curl_setopt( $ch, CURLOPT_POSTFIELDS, $myvars);
curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt( $ch, CURLOPT_RETURNTRANSFER, 1);
curl_exec( $ch );
?>

pvoutput-post-temp.php

<?php
$dte = date('Ymd');
$tme = date('H:i');

$url = 'http://localhost:21010/pvoutput-post-hotwater';

$temperature = $_POST["t"];
$immersion = $_POST["i"];
$myvars = 'd=' . $dte . '&t=' . $tme . '&v7=' . $temperature . '&v8=' . $immersion;

//echo "Post Data: " . $myvars;

$ch = curl_init( $url );
curl_setopt( $ch, CURLOPT_POST, 1);
curl_setopt( $ch, CURLOPT_POSTFIELDS, $myvars);
curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt( $ch, CURLOPT_RETURNTRANSFER, 1);
curl_exec( $ch );
?>

The next program is a Mule application which is listening on two ports. Each time it receives a message it converts it to an object and puts it on a vm queue with a correlation id of the date + time. If two messages are received with the same correlation id, the next phase is to combine the data and post the data to PVOutput and also write the data to file. The data is written to file in case I decide to write my own graphing app on top of the recorded data.

pvoutputaggregator

There’s also an exception strategy in case second message is never received. If the second message is not received and the first message is a Meter reading, we can still send the result to PVOutput

There are only 6 class files for the entire app – I’ll post them on the next part in order to not clutter this page

Next Part

Part 6: Java Code