[TUT] Google Cloud IoT Core – MQTT on Android

I just started playing with Google Cloud IoT Core with AndroidThings, and the communication is done via MQTT. However I couldn’t find a decent, simple example of how to do it. Not for long!

This blog explains how to setup communication between an Android app and Cloud IoT Core.

You will need your credentials so make sure you have the cloud console open: https://cloud.google.com/iot-core/

Here is the jist of what is needed from you (hint; read this):

– Create a cloud IoT Core project
– Create a Registry in your project
– Create a Device in your Registry
– Create a RSA Public Key Private Key pair

Once that is done, its on to the coding!

If you want to jump straight to it, I would check out this repository:

https://github.com/blundell/CloudIoTCoreMQTTExample

And copy the IoTCoreCommunicator into your project.

We use a library called Eclipse PAHO for the MQTT heavy lifting, so this blog won’t go into those details but just explain the cloud core communication.

compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

Firstly, all communication has to be signed with a password. We generate this password using the private key created earlier. Put the private key into your `raw` folder so it can be read later. (Note: in production you may want to put the file somewhere a bit more secret.)

To create the password we will use this dependency:

compile 'io.jsonwebtoken:jjwt:0.7.0'

And generate it, in a class of its own like so:

class IotCorePasswordGenerator {

    private final String projectId;
    private final Resources resources;
    private final int privateKeyRawFileId;

    IotCorePasswordGenerator(String projectId, Resources resources, int privateKeyRawFileId) {
        this.projectId = projectId;
        this.resources = resources;
        this.privateKeyRawFileId = privateKeyRawFileId;
    }

    char[] createJwtRsaPassword() {
        try {
            byte[] privateKeyBytes = decodePrivateKey(resources, privateKeyRawFileId);
            return createJwtRsaPassword(projectId, privateKeyBytes).toCharArray();
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException("Algorithm not supported. (developer error)", e);
        } catch (InvalidKeySpecException e) {
            throw new IllegalStateException("Invalid Key spec. (developer error)", e);
        } catch (IOException e) {
            throw new IllegalStateException("Cannot read private key file.", e);
        }
    }

    private static byte[] decodePrivateKey(Resources resources, int privateKeyRawFileId) throws IOException {
        try(InputStream inStream = resources.openRawResource(privateKeyRawFileId)) {
            return Base64.decode(inputToString(inStream), Base64.DEFAULT);
        }
    }

    private static String inputToString(InputStream is) {
        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
        return s.hasNext() ? s.next() : "";
    }

    private static String createJwtRsaPassword(String projectId, byte[] privateKeyBytes) throws NoSuchAlgorithmException, InvalidKeySpecException {
        return createPassword(projectId, privateKeyBytes, "RSA", SignatureAlgorithm.RS256);
    }

    private static String createPassword(String projectId, byte[] privateKeyBytes, String algorithmName, SignatureAlgorithm signatureAlgorithm) throws NoSuchAlgorithmException, InvalidKeySpecException {
        Instant now = Instant.now();
        // The device will be disconnected after the token expires, and will have to reconnect with a new token. 
        // The audience field should always be set to the GCP project id.
        JwtBuilder jwtBuilder =
                Jwts.builder()
                        .setIssuedAt(Date.from(now))
                        .setExpiration(Date.from(now.plus(Duration.ofMinutes(20))))
                        .setAudience(projectId);

        PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(privateKeyBytes);
        KeyFactory kf = KeyFactory.getInstance(algorithmName);

        return jwtBuilder.signWith(signatureAlgorithm, kf.generatePrivate(spec)).compact();
    }

}

Once we have a password we can use it to talk to IoT Core securely. Communication involves three points.

– Connecting to IoT Core
– Subscribing to config changes and other topics
– Publishing to events and other topics

Honestly, it is quite a mundane task. I’ve written the class below to do it for you. To take this code further you would want to look at the messageArrived callback and react to your messages.

public class IotCoreCommunicator {

    private static final String SERVER_URI = "ssl://mqtt.googleapis.com:8883";

    public static class Builder {

        private Context context;
        private String projectId;
        private String cloudRegion;
        private String registryId;
        private String deviceId;
        private int privateKeyRawFileId;

        public Builder withContext(Context context) {
            this.context = context;
            return this;
        }

        public Builder withProjectId(String projectId) {
            this.projectId = projectId;
            return this;
        }

        public Builder withCloudRegion(String cloudRegion) {
            this.cloudRegion = cloudRegion;
            return this;
        }

        public Builder withRegistryId(String registryId) {
            this.registryId = registryId;
            return this;
        }

        public Builder withDeviceId(String deviceId) {
            this.deviceId = deviceId;
            return this;
        }

        public Builder withPrivateKeyRawFileId(int privateKeyRawFileId) {
            this.privateKeyRawFileId = privateKeyRawFileId;
            return this;
        }

        public IotCoreCommunicator build() {
            if (context == null) {
                throw new IllegalStateException("context must not be null");
            }

            if (projectId == null) {
                throw new IllegalStateException("projectId must not be null");
            }
            if (cloudRegion == null) {
                throw new IllegalStateException("cloudRegion must not be null");
            }
            if (registryId == null) {
                throw new IllegalStateException("registryId must not be null");
            }
            if (deviceId == null) {
                throw new IllegalStateException("deviceId must not be null");
            }
            String clientId = "projects/" + projectId + "/locations/" + cloudRegion + "/registries/" + registryId + "/devices/" + deviceId;

            if (privateKeyRawFileId == 0) {
                throw new IllegalStateException("privateKeyRawFileId must not be 0");
            }
            MqttAndroidClient client = new MqttAndroidClient(context, SERVER_URI, clientId);
            IotCorePasswordGenerator passwordGenerator = new IotCorePasswordGenerator(projectId, context.getResources(), privateKeyRawFileId);
            return new IotCoreCommunicator(client, deviceId, passwordGenerator);
        }

    }

    private final MqttAndroidClient client;
    private final String deviceId;
    private final IotCorePasswordGenerator passwordGenerator;

    IotCoreCommunicator(MqttAndroidClient client, String deviceId, IotCorePasswordGenerator passwordGenerator) {
        this.client = client;
        this.deviceId = deviceId;
        this.passwordGenerator = passwordGenerator;
    }

    public void connect() {
        monitorConnection();
        clientConnect();
        subscribeToConfigChanges();
    }

    private void monitorConnection() {
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                Log.e("TUT", "connection lost", cause);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.d("TUT", "message arrived " + topic + " MSG " + message);
                // You need to do something with messages when they arrive
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                Log.d("TUT", "delivery complete " + token);
            }
        });
    }

    private void clientConnect() {
        try {
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            // Note that the the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we explicitly set this.
            // If you don't, the server will immediately close its connection to your device.
            connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

            // With Google Cloud IoT Core, the username field is ignored, however it must be set for the
            // Paho client library to send the password field. The password field is used to transmit a JWT to authorize the device.
            connectOptions.setUserName("unused-but-necessary");
            connectOptions.setPassword(passwordGenerator.createJwtRsaPassword());

            IMqttToken iMqttToken = client.connect(connectOptions);
            iMqttToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d("TUT", "success, connected");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e("TUT", "failure, not connected", exception);
                }
            });
            iMqttToken.waitForCompletion(TimeUnit.SECONDS.toMillis(30));
            Log.d("TUT", "IoT Core connection established.");
        } catch (MqttException e) {
            throw new IllegalStateException(e);
        }
    }

    /**
     * Configuration is managed and sent from the IoT Core Platform
     */
    private void subscribeToConfigChanges() {
        try {
            client.subscribe("/devices/" + deviceId + "/config", 1);
        } catch (MqttException e) {
            throw new IllegalStateException(e);
        }
    }

    public void publishMessage(String subtopic, String message) {
        String topic = "/devices/" + deviceId + "/" + subtopic;
        String payload = "{msg:\"" + message + "\"}";
        MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
        mqttMessage.setQos(1);
        try {
            client.publish(topic, mqttMessage);
            Log.d("TUT", "IoT Core message published. To topic: " + topic);
        } catch (MqttException e) {
            throw new IllegalStateException(e);
        }
    }

    public void disconnect() {
        try {
            Log.d("TUT", "IoT Core connection disconnected.");
            client.disconnect();
        } catch (MqttException e) {
            throw new IllegalStateException(e);
        }
    }

}

Now that a connection can be established, your device can receive configuration changes and publish its own events. In this MainActivity here I have connected to Google Cloud IoT Core over MQTT, which will then monitor for configuration changes. Also it starts a background thread and publishes 100 event messages.

public class MainActivity extends Activity {

    private IotCoreCommunicator communicator;
    private Handler handler;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        // Setup the communication with your Google IoT Core details
        communicator = new IotCoreCommunicator.Builder()
                .withContext(this)
                .withCloudRegion("your-region") // ex: europe-west1
                .withProjectId("your-project-id")   // ex: supercoolproject23236
                .withRegistryId("your-registry-id") // ex: my-devices
                .withDeviceId("a-device-id") // ex: my-test-raspberry-pi
                .withPrivateKeyRawFileId(R.raw.rsa_private)
                .build();

        HandlerThread thread = new HandlerThread("MyBackgroundThread");
        thread.start();
        handler = new Handler(thread.getLooper());
        handler.post(connectOffTheMainThread); // Use whatever threading mechanism you want
    }

    private final Runnable connectOffTheMainThread = new Runnable() {
        @Override
        public void run() {
            communicator.connect();

            handler.post(sendMqttMessage);
        }
    };

    private final Runnable sendMqttMessage = new Runnable() {
        private int i;

        /**
         * We post 100 messages as an example, 1 a second
         */
        @Override
        public void run() {
            if (i == 100) {
                return;
            }

            // events is the default topic for MQTT communication
            String subtopic = "events";
            // Your message you want to send
            String message = "Hello World " + i++;
            communicator.publishMessage(subtopic, message);

            handler.postDelayed(this, TimeUnit.SECONDS.toMillis(1));
        }
    };

    @Override
    protected void onDestroy() {
        communicator.disconnect();
        super.onDestroy();
    }
}

That’s it! Although the talking was brief in this tutorial, I feel the code speaks for itself. You can now communicate between your device and Cloud Core. As well as receive config changes and react to messages.

If you need to understand the cloud setup I would recommend these tutorials:

https://cloud.google.com/iot/docs/quickstart
https://cloud.google.com/iot/docs/how-tos/devices
https://cloud.google.com/iot/docs/how-tos/config/configuring-devices
https://cloud.google.com/iot/docs/how-tos/credentials/keys

Any questions, you can always reach out to me on Twitter, or post something on Stackoverflow and ping me it.

This code is all available as open source on GitHub: https://github.com/blundell/CloudIoTCoreMQTTExample