Saturday, January 28, 2017

HOW-TO use Kafka to stream Postgresql traffic with bottledwater_pg on a Raspberry Pi 3

If you want to stream changes to your Postgresql database to Kafka on your Raspberry Pi 3 - this is how I did it. I'm sure there are other ways to accomplish this. All I want to do here is to show you how I managed it. I'll preface my comments with a hash '#'.



First: The links.
bottledwater_pg
Apache Kafka
Apache Zookeeper
Postgresql
Confluent
Apache Avro 

Here goes:
sudo apt-get update && sudo apt-get -y upgrade
sudo apt-get install oracle-java8-jdk git
I'm pretty sure you only need a jre, but I installed the jdk for a different project

Install zookeeper and start
This is optional - the confluent package has a limited version of zookeeper

Compile and install postgresql from source
The version of postgresql provided by Debian jessie is older than that required by bottledwater_pg

Adjust the values of pg_hba.conf and postgresql.conf. I used 'kafka_user' but that's up to you:
sudo -i
echo "wal_level = logical
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4" >> /usr/local/pgsql/data/pg_hba.conf


echo "local   replication     kafka_user                 trust
host    replication    kafka_user  127.0.0.1/32   trust
host    replication    kafka_user  ::1/128        trust" >> /usr/local/pgsql/data/postgresql.conf

You're going to add that user "kafka_user" to postgres. This assumes that you followed the link above and created the database 'kafka_user':
export PATH=$PATH:/usr/local/pgsql/bin
psql -Upostgres -c "create role kafka_user with SUPERUSER LOGIN;"
createdb -Upostgres kafka_user
psql -Upostgres -c "REVOKE CONNECT ON DATABASE kafka_user FROM PUBLIC;"
psql -Upostgres -c "GRANT CONNECT ON DATABASE kafka_user TO kafka_user;"
Now you need Apache Avro, or at least the C and maybe C++ portions. I couldn't get the complete avro package to compile out-of-the-box. Originally I installed all of the pre-requisites, but ended up just installing the C and C++ portions.

Get the confluent package:

wget http://packages.confluent.io/archive/3.0/confluent-3.0.1-2.11.zip
sudo unzip confluent-3.0.1-2.11.zip -d /opt
 Install bottledwate_pg:
sudo -i
cd /opt
git clone https://github.com/confluentinc/bottledwater-pg.git
apt-get install libsnappy-dev librdkafka-dev libcurl4-openssl-dev libpq-dev
export PKG_CONFIG_PATH=/lib/pkgconfig/
ln -s /usr/local/pgsql/bin/pg_config /usr/bin/pg_config
make
make install
I added the kafka_user to my Raspberry Pi because I didn't want to figure out the right postgres connection string:
sudo useradd kafka_user
Almost there. Now just follow the instructions on bottledwater_pg's github page:
# start kafka server as root (zookeeper should already be running)
cd /opt/confluent
./bin/kafka-server-start ./etc/kafka/server.properties

# start schema registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

# start bottled water
## I had to do this as user kafka_user
kafka_user@raspberrypi:/opt/bottledwater-pg $ ./kafka/bottledwater --postgres=postgres://localhost
-- or --
runuser -l kafka_user -c"cd /opt/bottledwater-pg; ./kafka/bottledwater --postgres=postgres://localhost"

#create table in postgresql (this creates a topic - which should be the same name as the table). Make sure it has a primary key.
/usr/local/pgsql/bin/psql -Ukafka_user kafka_user -c "CREATE TABLE test_table_with_pk(id INT NOT NULL PRIMARY KEY, some_text VARCHAR, a_float_val FLOAT);"
# list topics
root@raspberrypi:/opt/confluent-3.0.1# bin/kafka-topics --list --zookeeper localhost:2181
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.
_schemas
test_table_with_pk

#listen to topic
root@raspb32:/opt/confluent-3.0.1# ./bin/kafka-avro-console-consumer --topic table_with_pk --zookeeper localhost:2181  --property print.key=true --from-beginning

Insert some data into your table, and you can see it in the consumer:
terminal running postgres:
kafka_user=# SELECT * FROM table_with_pk;
 id |  test_text  | a_float
----+-------------+---------
  1 | some text 1 |     1.8
  2 | some text 2 |     2.8
  3 | some text 3 | 2.87609
  4 | test text 4 |      12
(4 rows)

kafka_user=# INSERT INTO table_with_pk VALUES(5,'test text5', 17.98);
INSERT 0 1

terminal running a kafka consumer:
root@raspb32:/opt/confluent-3.0.1# ./bin/kafka-avro-console-consumer --topic table_with_pk --zookeeper localhost:2181  --property print.key=true --from-beginning
OpenJDK Zero VM warning: G1 GC is disabled in this release.
...
{"id":{"int":4}}    {"id":{"int":4},"test_text":{"string":"test text 4"},"a_float":{"double":12.0}}
{"id":{"int":5}}    {"id":{"int":5},"test_text":{"string":"test text5"},"a_float":{"double":17.98}}
Please let me know if you run into any problems - or especially if there are errors in this HOW-TO.

Thanks.

No comments:

Post a Comment