通过jpype实现Python语言调用Hadoop Java Client操作HDFS
小标 2018-12-20 来源 : 阅读 1355 评论 0

摘要:本文主要向大家介绍了通过jpype实现Python语言调用Hadoop Java Client操作HDFS,通过具体的内容向大家展示,希望对大家学习Python语言有所帮助。

本文主要向大家介绍了通过jpype实现Python语言调用Hadoop Java Client操作HDFS,通过具体的内容向大家展示,希望对大家学习Python语言有所帮助。

通过python操作HDFS有非常好用的模块hdfs3 ,其底层依赖c++模块libhdfs3,起初libhdfs3不支持acl权限校验对于设置了acl权限的目录和文件会报NOTFOUND错误(现已解决,可见libhdfs3-downstream )
起初想过通过python调用java的hadoop jar包来解决这个问题,对其做出了部分尝试,但是卡在了获取文件输入输出流,没有很好的解决办法。就当python调用java包的一次尝试吧。
通过python操作java的jar包有很多方案,这里我们介绍两种,jpype和py4j。两者都能够让python代码方便的调用java代码,从而帮助我们解决python在某些领域遇到的问题。
两者相比较而言py4j会比较强一点,因为jpype的作者不搞jpype了,说jpype底层设计就有一些问题,懒得从头再改,反而弄了个py4j。。。而且py4j的社区比较活跃,毕竟pyspark就使用的py4j,经过了市场的考验。但是我们今天讲的是jpype。。关于py4j的相关信息大家可以看其官网 。
安装jpype
我机器安装了anaconda2,通过conda进行安装命令如下:
 conda install -c conda-forge jpype1
其他安装方式见文档 。
简单示例(调用JDK系统库)
import jpype
from jpype import *

if __name__ == "__main__":
    startJVM(jpype.getDefaultJVMPath())
    java.lang.System.out.println("Hello World")
    shutdownJVM()
关键的操作其实很简单,首先启动一个jvm,然后就可以引用相关的java方法了,然后使用完毕之后记得关闭jvm。py4j与之不同的是py4j不会帮你启动一个jvm,需要自己写java代码实现GatewayServer。
通过jpype操作HDFS
我们本意是想通过python调用Hadoop的java client来达到我们使用python去操作HDFS的目的。我们的实现步骤如下:

首先我们要开发一个java程序,在其内部封装一些常用的HDFS操作
通过jpype加载第三方jar包,封装python调用java的方法

引入我们封装的python的模块,即可愉快的操作HDFS
封装一个HDFS操作jar
首先,在pom.xml里面引入相关依赖及plugin。这里需要注意的是,我们要打包一个jar-with-dependencies,这样便于我们通过jpype加载jar包的操作。


 
    
      org.apache.hadoop
      hadoop-common
      2.7.3
    

    
      org.apache.hadoop
      hadoop-hdfs
      2.7.3
    

  

  
    
      
         maven-assembly-plugin 
        
          
            jar-with-dependencies
          

        

        
          
            make-assembly
            package
            
              single
            

          

        

      

    

  

然后我们新建一个HdfsFileSystem的类,里面实现一些简单的操作,比如copyFromLocal,mkdir等等
 package com.imooc.bigdata.python.hdfs;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;

/**
 * Created by jixin on 18-6-10.
 */
public class HdfsFileSystem {

  private UserGroupInformation ugi;
  private FileSystem fileSystem = null;
  private long lastLoginCheck = System.currentTimeMillis();
  private long maxCheckInterval = 10 * 60 * 1000;

  /**
   * HdfsFileSystem.
   */
  public HdfsFileSystem(String hadoopHome, String kerberosConf, String userName,
      String keytabPath, String hadoopUri) {
    System.setProperty("hadoop.home.dir", hadoopHome);
    System.setProperty("java.security.krb5.conf", kerberosConf);
    try {
      Configuration ugiConf = new Configuration();
      ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, AuthenticationMethod.KERBEROS.name());
      UserGroupInformation.setConfiguration(ugiConf);
      ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(userName, keytabPath);
      fileSystem = doPrivileged(new PrivilegedExceptionAction() {
        @Override
        public FileSystem run() throws Exception {
          Configuration conf = new Configuration();
          String coreSite = hadoopHome + "/etc/hadoop/core-site.xml";
          String hdfsSite = hadoopHome + "/etc/hadoop/hdfs-site.xml";
          conf.addResource(new Path(coreSite));
          conf.addResource(new Path(hdfsSite));
          conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
          return FileSystem.get(new URI(hadoopUri), conf);
        }
      });
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  public FileStatus[] listStatus(String path) throws IOException {
    this.checkLogin();
    return fileSystem.listStatus(new Path(path));
  }

  public boolean mkdirs(String path) throws IOException {
    this.checkLogin();
    return fileSystem.mkdirs(new Path(path));
  }

  public boolean mkdirs(String path, String mode) throws IOException {
    this.checkLogin();
    return fileSystem.mkdirs(new Path(path), new FsPermission(mode));
  }

  public FSDataInputStream open(String path, String fileContent, int bufferSize) throws IOException {
    this.checkLogin();
    return fileSystem.open(new Path(path), bufferSize);
  }

  public FSDataOutputStream create(String path) throws IOException {
    this.checkLogin();
    return fileSystem.create(new Path(path));
  }

  public FSDataOutputStream create(String path, boolean overwrite) throws IOException {
    this.checkLogin();
    return fileSystem.create(new Path(path), overwrite);
  }

  public FSDataOutputStream create(String path, int replication) throws IOException {
    this.checkLogin();
    return fileSystem.create(new Path(path), (short) replication);
  }

  public FSDataOutputStream create(String path, boolean overwrite, int bufferSize)
      throws IOException {
    this.checkLogin();
    return fileSystem.create(new Path(path), overwrite, bufferSize);
  }

  public FSDataOutputStream create(String path, boolean overwrite, int bufferSize, int replication,
      long blockSize) throws IOException {
    this.checkLogin();
    return fileSystem.create(new Path(path), overwrite, bufferSize, (short) replication, blockSize);
  }

  public boolean setReplication(String path, int replication) throws IOException {
    this.checkLogin();
    return fileSystem.setReplication(new Path(path), (short) replication);
  }

  public boolean delete(String path, boolean recursive) throws IOException {
    this.checkLogin();
    return fileSystem.delete(new Path(path), recursive);
  }

  public ContentSummary getContentSummary(String path) throws IOException {
    this.checkLogin();
    return fileSystem.getContentSummary(new Path(path));
  }

  public void copyFromLocalFile(String src, String dst) throws IOException {
    this.checkLogin();
    fileSystem.copyFromLocalFile(new Path(src), new Path(dst));
  }

  public void copyFromLocalFile(boolean delSrc, String src, String dst) throws IOException {
    this.checkLogin();
    fileSystem.copyFromLocalFile(delSrc, new Path(src), new Path(dst));
  }

  public void copyFromLocalFile(boolean delSrc, boolean overwrite, String src, String dst)
      throws IOException {
    this.checkLogin();
    fileSystem.copyFromLocalFile(delSrc, overwrite, new Path(src), new Path(dst));
  }

  public void copyToLocalFile(String src, String dst) throws IOException {
    this.checkLogin();
    fileSystem.copyToLocalFile(new Path(src), new Path(dst));
  }

  public void copyToLocalFile(boolean delSrc, String src, String dst) throws IOException {
    this.checkLogin();
    fileSystem.copyToLocalFile(delSrc, new Path(src), new Path(dst));
  }

  public void copyToLocalFile(boolean delSrc, String src, String dst, boolean useRawLocalFileSystem)
      throws IOException {
    this.checkLogin();
    fileSystem.copyToLocalFile(delSrc, new Path(src), new Path(dst), useRawLocalFileSystem);
  }

  public void moveFromLocalFile(String src, String dst) throws IOException {
    this.checkLogin();
    fileSystem.moveFromLocalFile(new Path(src), new Path(dst));
  }

  public void moveToLocalFile(String src, String dst)
      throws IOException {
    this.checkLogin();
    fileSystem.moveToLocalFile(new Path(src), new Path(dst));
  }

  public boolean exists(String path)
      throws IOException {
    this.checkLogin();
    return fileSystem.exists(new Path(path));
  }

  public boolean isDirectory(String path)
      throws IOException {
    this.checkLogin();
    return fileSystem.isDirectory(new Path(path));
  }

  public boolean isFile(String path)
      throws IOException {
    this.checkLogin();
    return fileSystem.isFile(new Path(path));
  }

  public boolean rename(String src, String dst)
      throws IOException {
    this.checkLogin();
    return fileSystem.rename(new Path(src), new Path(dst));
  }

  public FileStatus[] globStatus(String pathPattern)
      throws IOException {
    this.checkLogin();
    return fileSystem.globStatus(new Path(pathPattern));
  }

  public FsStatus getStatus(String path) throws IOException {
    this.checkLogin();
    return fileSystem.getStatus(new Path(path));
  }

  public FileStatus getFileStatus(String path) throws IOException {
    this.checkLogin();
    return fileSystem.getFileStatus(new Path(path));
  }

  public void setPermission(String path, String mode) throws IOException {
    this.checkLogin();
    fileSystem.setPermission(new Path(path), new FsPermission(mode));
  }

  public void setOwner(String path, String userName, String groupName) throws IOException {
    this.checkLogin();
    fileSystem.setOwner(new Path(path), userName, groupName);
  }

  public AclStatus getAclStatus(String path) throws IOException {
    this.checkLogin();
    return fileSystem.getAclStatus(new Path(path));
  }

  public void removeDefaultAcl(String path) throws IOException {
    this.checkLogin();
    fileSystem.removeDefaultAcl(new Path(path));
  }

  public void removeAcl(String path) throws IOException {
    this.checkLogin();
    fileSystem.removeAcl(new Path(path));
  }

  public FileSystem getFileSystem() {
    try {
      this.checkLogin();
      return fileSystem;
    } catch (IOException ioe) {
      ioe.printStackTrace();
    }
    return null;
  }

  private  T doPrivileged(PrivilegedExceptionAction action)
      throws IOException, InterruptedException {
    if (ugi == null) {
      throw new RuntimeException("hdfs login error");
    }
    try {
      ugi.checkTGTAndReloginFromKeytab();
    } catch (IOException e) {
      throw new RuntimeException("hdfs login error");
    }
    return ugi.doAs(action);
  }

  private synchronized void checkLogin() throws IOException {
    long now = System.currentTimeMillis();
    long interval = now - this.lastLoginCheck;
    if (interval > this.maxCheckInterval) {
      this.ugi.checkTGTAndReloginFromKeytab();
      this.lastLoginCheck = now;
    }
  }

}
封装python模块
# -*- coding: utf-8 -*-

from jpype import *

class HDFileSystem():
    # 初始化jvm and kerberos认证
    def __init__(self, user_name, keytab_path, hadoop_home, kerberos_conf='/etc/krb5.conf',
                 hdfs_uri='hdfs://namenode-1:9000'):
        java_hdfs_conf = "-Djava.class.path={0}".format('dependencies/hdfs-python.jar')
        startJVM(getDefaultJVMPath(), "-ea", java_hdfs_conf)
        java_hdfs_class = JClass('com.hfmlog.gavial.python.hdfs.HdfsFileSystem')
        self.filesystem = java_hdfs_class(hadoop_home, kerberos_conf, user_name, keytab_path, hdfs_uri)

    # 其他操作的封装
    def isdir(self, path):
        return self.filesystem.isDirectory(path)

    def isfile(self, path):
        return self.filesystem.isFile(path)

    def glob(self, path):
        return self.filesystem.globStatus(path)

    def ls(self, path):
        return self.filesystem.listStatus(path)

    def mkdir(self, path):
        return self.filesystem.mkdirs(path)

    def makedirs(self, path, mode="755"):
        return self.filesystem.mkdirs(path)

    def set_replication(self, path, replication):
        return self.filesystem.setReplication(path, replication)

    def mv(self, path1, path2):
        return self.filesystem.rename(path1, path2)

    def rm(self, path, recursive=True):
        return self.filesystem.delete(path, recursive)

    def exists(self, path):
        return self.filesystem.exists(path)

    def chmod(self, path, mode):
        self.filesystem.setPermission(path, mode)

    def chown(self, path, owner, group):
        self.filesystem.setOwner(path, owner, group)

    def get(self, src, dst, del_src=False):
        self.filesystem.copyToLocalFile(del_src, src, dst)

    def put(self, src, dst, del_src=False, overwrite=True):
        self.filesystem.copyFromLocalFile(del_src, src, dst, overwrite)

    def create(self, path, overwrite=True, replication=0, buff=0, block_size=0):
        pass

    def read(self, path, buff=1024):
        pass

    # 释放资源,终止jvm
    def __del__(self):
        shutdownJVM()
由上面代码可见,我们通过jpype加载第三方模块的时候需要启动jvm时传递-Djava.class.path=参数。
from jpype import *

if __name__ == "__main__":
    jars = ["/home/hadoop/JpypeTest.jar"]
    jvm_path = getDefaultJVMPath()
    jvm_cp = "-Djava.class.path={}".format(":".join(jars))

    startJVM(jvm_path, jvm_cp)

    # 获取 Main 类
    Main = JClass("com.imooc.bigdata.demo.JpypeTest")
    # 执行 main 函数,注意参数是 String[] args
    Main.main([])

    shutdownJVM()
测试
from hdfs import HDFileSystem
client = HDFileSystem(self.principal, self.keytab_file,self.hadoop_home, self._krb5conf,self._hdfs_uri)

client.exists('/tmp/test')
client.isdir('/tmp')
client.isfile('/tmp/test')
看到这,是不是想用py4j搞个大新闻了呢?    

本文由职坐标整理并发布,希望对同学们学习Python有所帮助,更多内容请关注职坐标编程语言Python频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程