This blog was written by Ahsan Hadi and co-written by Ibrar Ahmed.
Advances in Postgres in recent releases have opened up opportunities for expanding features that support data integration. The most compelling of these new features are called Foreign Data Wrappers (FDWs). FDWs essentially act as a pipeline for data to move to and from Postgres and other kinds of databases as if the different solutions were a single database. EnterpriseDB has developed new FDWs for MongoDB and MySQL. Now, we have released one for Hadoop.
The FDW for Hadoop can be found on EDB’s GitHub page.
Foreign Data Wrappers enable Postgres to act as the central hub, a federated database, in the enterprise. And the features have emerged at a good time as integration becomes more of a challenge. Database administrators (DBAs) have begun using more and more specialized NoSQL-only solutions to address specific problems. Now DBAs need to address getting the data in all of these different solutions to tell one single story, or lend value to one another.
Transparent data integration for Postgres-based databases. Download Now.
Since EDB released the new FDWs with both read and write capability to the open source PostgreSQL community, usage has grown tremendously. More and more developers are contributing features and fixes.
Now that we’ve established some context for the FDWs, let’s explore the Hadoop FDW in more detail. There are few popular terms worth exploring first:
Hadoop Distributed File System (HDFS) – HDFS is a distributed file system for large data storage and processing. It’s java-based and provides scalable and reliable storage that is designed to span large clusters of commodity servers. The data is stored in flat files and format-free. As a result, it’s used for very large data sets and a typical HDFS file is a GB to TB in size. Applications that run HDFS require streaming access to their data sets.
MapReduce - MapReduce is a programming model and associated implementation for processing and generating large data sets. The model was inspired by the map and reduce functions commonly used in functional programming. MapReduce jobs are written in java and are used for performing statistical analysis, aggregates or other complex processing on large data sets stored in HDFS.
Hive server - The Apache Hive is data warehouse software that facilitates querying and managing large datasets residing in distributed storage i.e. an HDFS. Hive defines a simple query-like language called QL which is used for querying and manipulating large data sets stored in an HDFS. The QL language is similar to SQL and provides similar constructs for retrieving data. Hive server is an optional service that allows remote clients to send requests to HIVE using various programming languages and retrieve results.
Foreign Data Wrapper (FDW) – While we have introduced FDWs already, it’s good to know they are based on Postgres implementation of the SQL/MED (SQL management of external data) specification of the SQL standard. It is a standard way of accessing external data stores ranging from SQL and NoSQL-only databases to flat files. FDWs provide a SQL interface for accessing remote objects and large data objects stored in remote data stores. The FDWs supported by EnterpriseDB are postgres_fdw, oracle_fdw, mongo_fdw, mysql_fdw and now we’re adding HDFS_fdw to the list.
Using the HDFS FDW
The following components need to be installed in order to use the HDFS FDW:
* PostgreSQL or EDB’s Postgres Plus Advanced Server
* Hadoop
* Hive server 1 or Hive server 2
* The HDFS FDW extension
(The HDFS FDW github webpage provides clear instructions on how to set up HDFS FDW and its required components.)
This is the first version and it supports the following functionality:
1. Support for hive server 1 and hive server 2
The queries will be routed through hive server 1 or hive server 2 to HDFS. (You can run one at a time but not both.) As shown in the above diagram, the HDFS FDW sends queries to hive server and hive server communicates with HDFS, returning the results back to the client. Hive supports a number of custom map reduce jobs that are executed for faster processing of data. The map reduce jobs are called automatically by hive server for aggregates, mathematical operations and other complex operations. Hive server 2 can be seen as a superset of hive server 1 with some additional features like concurrency and security.
As explained above, data is stored in flat files in an HDFS file system. Using the hive client the user needs to create a table, which maps for flat files in HDFS. In the table definition created in hive, the user specifies the data delimiter. The table can be queried in the hive client using a language called QL which is similar to SQL. The working example below shows how this is done.
2. Select functionality
The first version of HDFS FDW was ready only. It provides complete support for SELECT over foreign tables, which map to the corresponding table in hive, which then maps to the corresponding flat file in HDFS. The query clauses, i.e. joins, functions in target list, etc., that can’t be pushed to the remote server are processed locally in PostgreSQL and Postgres Plus Advanced Server.
3. Map Reduce Jobs
Hive server will execute built-in map reduce jobs for aggregates or other complex operation used in the where clause of the select query. Map reduce jobs are used for querying and manipulating large data sets in HDFS. In future releases of HDFS FDW, we will allow the user to write custom map reduce jobs for manipulating and processing data stored in HDFS. The user can trigger the custom map reduce job by using a heavy processing function in the where clause which will kick off the corresponding map reduce job.
4. Push-down
Push-down means pushing down appropriate quals of the query to the remote server for faster processing. Currently the PG FDW machinery only allows where clause and target columns to be pushed down to the remote server. The HDFS FDW also has the capability of pushing down where clause and target column list to the foreign server.
HDFS_FDW in Action
Here is how it works, for more detailed instructions please visit our github home page of HDFS_FDW.
-- load extension first time after install
CREATE EXTENSION hdfs_fdw;
-- create server object
CREATE SERVER hdfs_server
FOREIGN DATA WRAPPER hdfs_fdw
OPTIONS (host '127.0.0.1');
-- create user mapping
CREATE USER MAPPING FOR postgres
SERVER hdfs_server;
-- create foreign table
CREATE FOREIGN TABLE weblogs
(
client_ip text,
full_request_date text,
day text,
Month text,
month_num int,
year text,
hour text,
minute text,
second text,
timezone text,
http_verb text,
uri text,
http_status_code text,
bytes_returned text,
referrer text,
user_agent text
)
SERVER hdfs_server
OPTIONS (dbname 'db', table_name 'weblogs');
-- select from table
postgres=# select distinct client_ip IP,count(*) from weblogs group by IP having count(*) > 5000;
ip | count
-----------------+-------
683.615.622.618 | 13505
14.323.74.653 | 16194
13.53.52.13 | 5494
361.631.17.30 | 64979
363.652.18.65 | 10561
325.87.75.36 | 6498
322.6.648.325 | 13242
325.87.75.336 | 6500
(8 rows)
create table premium_ip
(
client_ip text, category text
);
insert into premium_ip values ('683.615.622.618','Category A');
insert into premium_ip values ('14.323.74.653','Category A');
insert into premium_ip values ('13.53.52.13','Category A');
insert into premium_ip values ('361.631.17.30','Category A');
insert into premium_ip values ('361.631.17.30','Category A');
insert into premium_ip values ('325.87.75.336','Category B');
postgres=# select hd.client_ip IP,pr.category,count(hd.client_ip)
from weblogs hd, premium_ip pr
where hd.client_ip = pr.client_ip
and hd.year = '2011'
group by hd.client_ip,pr.category;
ip | category | count
-----------------+------------+-------
14.323.74.653 | Category A | 9459
361.631.17.30 | Category A | 76386
683.615.622.618 | Category A | 11463
13.53.52.13 | Category A | 3288
325.87.75.336 | Category B | 3816
(5 rows)
postgres=# explain verbose select hd.client_ip IP,pr.category,count(hd.client_ip)
from weblogs hd, premium_ip pr
where hd.client_ip = pr.client_ip
and hd.year = '2011'
group by hd.client_ip,pr.category;
QUERY PLAN
---------------------------------------------------------------------------------------------- HashAggregate (cost=221.40..264.40 rows=4300 width=64)
Output: hd.client_ip, pr.category, count(hd.client_ip)
Group Key: hd.client_ip, pr.category
-]]]]> Merge Join (cost=120.35..189.15 rows=4300 width=64)
Output: hd.client_ip, pr.category
Merge Cond: (pr.client_ip = hd.client_ip)
-]]]]> Sort (cost=60.52..62.67 rows=860 width=64)
Output: pr.category, pr.client_ip
Sort Key: pr.client_ip
-]]]]> Seq Scan on public.premium_ip pr (cost=0.00..18.60 rows=860 width=64)
Output: pr.category, pr.client_ip
-]]]]> Sort (cost=59.83..62.33 rows=1000 width=32)
Output: hd.client_ip
Sort Key: hd.client_ip
-]]]]> Foreign Scan on public.weblogs hd (cost=100.00..10.00 rows=1000 width=32)
Output: hd.client_ip
Remote SQL: SELECT client_ip FROM weblogs WHERE ((year = '2011’))
Future Enhancements
This is the first release of HDFS_FDW with new read and write capabilities. We are already planning future releases with the following functionality:
- Support writeability via Hbase
- Bulk load of data
- More pushdown features (joins, aggregates, sort etc)
- Custom Map Reduce Jobs
- Support for Flum/Impala server
- Authentication support
So stay tuned for more to come from EDB!
Please visit our site for more information or contact us with any questions.
Ahsan Adhi is Senior Director Product Development at EnterpriseDB.
Abrar Ahmed is a Database Architect at EnterpriseDB.