lsekfe 发表于 2022-12-19 15:47:31

Python和Java之间相互调用是怎么做到的?

 需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。
  具体执行流程,java服务执行页面输入的python语句,python语句调用java方法得到结果,python进行自定义处理后再返回给java服务,java服务最终返回具体的结果给客户端。
  语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。

| dbquery sql="select id,num,create_time from security_log"
  | python
  import sys
  from py4j.java_gateway import JavaGateway, GatewayParameters
  gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv)))
  SESSION_ID = sys.argv
  
  # 获取java 实例类
  entry= gateway.entry_point
  # 调用 java具体方法,java返回具体的json
  preDataJson = entry.getPreDateJson(SESSION_ID)
  
  # 以下开始处理java返回的结果json
  import pandas as pd
  df = pd.read_json(preDataJson)
  rtn_data = df[:2].to_json(orient='records')
  # 处理完成的rtn_data 再设置给java 实例
  entry.setDatasetJson(SESSION_ID,rtn_data )

首先java服务中引用依赖。
   <dependency>
        <groupId>net.sf.py4j</groupId>
        <artifactId>py4j</artifactId>
        <version>0.10.7</version>
      </dependency>

首先java端定义使用pythonServer网关入口。
public class PythonServer {
  
      /**
     * python 网关服务
     */
      private GatewayServer gatewayServer;
  
      /**
     * python 服务启动端口
     */
      private int port = GatewayServer.DEFAULT_PORT;
  
      /**
     * 缓存当前几个 python reader
     */
      private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
  
      public PythonServer() {
        for (int i = 0; i < 10; i++) {
              if (gatewayServer == null) {
                  try {
                    gatewayServer = new GatewayServer(new EntryPoint(), port);
                    gatewayServer.start();
                    log.info("Python 网关启动成功,端口:" + port);
                  } catch (Exception e) {
                    log.warn("Python 网关启动失败,端口:" + port);
                    port += 2;//每次加2
                    log.warn("Python 网关尝试下一个启动端口:" + port);
                  }
              }
        }
        if (gatewayServer == null) {
              log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
              System.exit(-1);
        }
      }
  
      /**
     * 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
     * @param uuid
     * @param pythonComd
     */
      public void openSession(String uuid, PythonComd pythonComd) {
        pythonComdMap.put(uuid, pythonComd);
      }
  
      /**
     * 关闭一个 java to python 的会话
     * @param uuid
     */
      public void closeSession(String uuid) {
        pythonComdMap.remove(uuid);
      }
  
      /**
     * 用于 python 与 java GatewayServer 通信的端口号
     * @return
     */
      public int getPort() {
        return port;
      }
  
      /**
     * 当 java 进程关闭时,结束 python 网关。
     */
      @PreDestroy
      public void shutdown() {
        gatewayServer.shutdown();
      }
  
      class EntryPoint {
  
        /**
           * 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
           * 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
           * @param uuid
           * @return
           */
        public String getPreDataJson(String uuid) {
              PythonComd pythonComd = pythonComdMap.get(uuid);
              if (pythonComd == null) {
                  return null;
              }
              Dataset<Row> preDataset = pythonComd.getPreDateset();
              if (preDataset == null) {
                  return null;
              }
              try {
                  SparkContext sparkContext = preDataset.sparkSession().sparkContext();
                  PythonConf conf = pythonComd.getConf();
                  String taskId = conf.getTaskId();
                  String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
                  sparkContext.setJobGroup(uuid, description, true);
                  sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
  
                  List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
                  return JsonUtil.objToJson(list);
              } catch (GplException e) {
                  log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
              }
              return null;
        }
  
  
  
        /**
           * 接收 python 返回的 json。
           * @param uuid
           * @param json
           */
        public void setDataJson(String uuid, String json) {
              PythonComd pythonComd = pythonComdMap.get(uuid);
              if (pythonComd == null) {
                  return;
              }
              if (json == null || json.isEmpty()) {
                  PythonConf conf = pythonComd.getConf();
                  log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
                  return;
              }
              List<String> list = new ArrayList<>();
              list.add(json);
              SparkSession sparkSession = BeanFactory.getSparkSession();
              Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
              Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
              pythonComd.setReturnResult(dataset);
        }
      }
  }

java端执行python代码。
public class PythonComd extends BaseComd<PythonConf> {
  
      /**
     * 单例,用于和 python 进程通信。
     */
      @Resource
      private PythonServer pythonServer;
  
      /**
     * python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
     * execSparkDataset() 方法,返回 this.returnResult;
     */
      @Setter
      private Dataset<Row> returnResult;
  
      /**
     * 输出流字符串。
     */
      private StringBuilder outputStringBuilder = new StringBuilder();
  
      /**
     * 错误流字符串。
     */
      private StringBuilder errorStringBuilder = new StringBuilder();
  
      /**
     * 构造方法,传入配置类。
     * @param conf
     */
      public PythonComd(PythonConf conf) {
        super(conf);
      }
  
  
      /**
     * 以命令行的方式,执行 python 进程。
     * @param preDataset 上一条命令生成的结果集对象
     * @return
     * @throws GplException
     */
      @Override
      public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
        if (preDataset == null) {
              throw new GplException("python 命令,未找到结果集!");
        }
        String script = conf.getExpression();
        String fileName = conf.getFilename();
        if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
              throw new GplException("python 命令,脚本为空!");
        }
        if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
              File pyFile = new File(NConst.PYTHON_PATH + fileName);
              if (!pyFile.exists()) {
                  throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
              }
        }
        if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
              File file = new File(AppConfig.PYTHON_ENV);
              if (!file.exists()) {
                  throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
              }
        }
  
        String uuid = conf.getUuid();
        pythonServer.openSession(uuid, this);
        String containerName = uuid;
        try {
              String scriptPath = null;
              if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
                  String md5 = DigestUtils.md5Hex(script);
                  String tmpName = md5 + ".py";
                  scriptPath = NConst.PYTHON_PATH + tmpName;
                  File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
                  FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
              } else {
                  scriptPath = NConst.PYTHON_PATH + fileName;
              }
  
              List<String> command = new ArrayList<>();
              if (!StringUtils.isEmpty(conf.getDocker())) {
                  // docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
                  command.add("docker");
                  command.add("run");
                  command.add("--name");
                  command.add(containerName);//容器名
                  command.add("-e");
                  command.add("TZ=Asia/Shanghai");//设置时区
                  command.add("-v");
                  command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
                  command.add("--net");
                  command.add("host");
                  command.add(conf.getDocker());// docker 镜像名
              }
  
              if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
                  command.add("python");
              } else {
                  command.add(AppConfig.PYTHON_ENV);
              }
              command.add(scriptPath);//py文件路径
              command.add(String.valueOf(pythonServer.getPort()));//端口号
              command.add(uuid);
              String cmd = String.join(" ", command);
              log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
              ProcessBuilder processBuilder = new ProcessBuilder(command);
              Process process = processBuilder.start();
              this.printProcessStream(process);
              int timeout = conf.getTimeout();
              log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
              if (timeout > 0) {
                  boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
                  if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
                    process.destroy();
                  }
              } else {
                  process.waitFor();
              }
              this.clearTmpFile();//最后执行一下清理目录
        } catch (Exception e) {
              log.error(e.getMessage(), e);
        } finally {
              if (!StringUtils.isEmpty(conf.getDocker())) {
                  try {
                    Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
                    this.printProcessStream(process);
                    process.waitFor();
                    log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
                  } catch (Exception e) {
                    log.error(e.getMessage(), e);
                  }
              }
              pythonServer.closeSession(uuid);
        }
        if (errorStringBuilder.length() > 0) {
              String error = errorStringBuilder.toString();
              throw new GplException("python 命令,脚本执行异常!", error);
        }
        if (outputStringBuilder.length() > 0) {
              String input = outputStringBuilder.toString();
              comdTask.addWarnMsg("python 命令,控制台打印:" + input);
        }
        if (returnResult == null) {
              throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
        }
        // 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
        Map<String, String> fieldMap = new HashMap<>();
        StructType structType = returnResult.schema();
        StructField[] structFieldArray = structType.fields();
        for (StructField structField : structFieldArray) {
              fieldMap.put(structField.name(), structField.dataType().simpleString());
        }
        log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
        return this.returnResult;
      }
  
      /**
     * 清理生成的临时文件
     */
      private void clearTmpFile() {
        File pyPath = new File(NConst.PYTHON_PATH);
        File[] fileArray = pyPath.listFiles(f -> {
              return f.getName().length() == 35//文件名长度 35 的是临时文件
                    && System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
        });
        for (File file : fileArray) {
              file.delete();
        }
      }
  
      /**
     * 启动两个后台线程,用于接收 python 进程的输出流。
     * @param process
     */
      private void printProcessStream(Process process) {
        Thread inputThread = new Thread(() -> {
              try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
                  String input = null;
                  while ((input = inputReader.readLine()) != null) {
                    outputStringBuilder.append("\n");
                    outputStringBuilder.append(input);
                  }
                  if (outputStringBuilder.length() > 0) {
                    log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
                  }
              } catch (Exception e) {
                  log.error(e.getMessage(), e);
              }
        });
        inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
        inputThread.setDaemon(true);
        inputThread.start();
  
        Thread errorThread = new Thread(() -> {
              try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
                  String error = null;
                  while ((error = errorReader.readLine()) != null) {
                    errorStringBuilder.append("\n");
                    errorStringBuilder.append(error);
                  }
                  if (errorStringBuilder.length() > 0) {
                    log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
                  }
              } catch (Exception e) {
                  log.error(e.getMessage(), e);
              }
        });
        errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
        errorThread.setDaemon(true);
        errorThread.start();
      }
  }


oliver.tang 发表于 2023-3-15 15:17:08

有用
页: [1]
查看完整版本: Python和Java之间相互调用是怎么做到的?