作者: lsekfe    时间: 2022-12-19 15:47
标题: Python和Java之间相互调用是怎么做到的?
 需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。
  语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。

  1. | dbquery sql="select id,num,create_time from security_log"
  2.   | python
  3.   import sys
  4.   from py4j.java_gateway import JavaGateway, GatewayParameters
  5.   gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv[1])))
  6.   SESSION_ID = sys.argv[2]
  8.   # 获取java 实例类
  9.   entry= gateway.entry_point
  10.   # 调用 java具体方法,java返回具体的json
  11.   preDataJson = entry.getPreDateJson(SESSION_ID)
  13.   # 以下开始处理java返回的结果json
  14.   import pandas as pd
  15.   df = pd.read_json(preDataJson)
  16.   rtn_data = df[:2].to_json(orient='records')
  17.   # 处理完成的rtn_data 再设置给java 实例
  18.   entry.setDatasetJson(SESSION_ID,rtn_data )

  1.    <dependency>
  2.         <groupId>net.sf.py4j</groupId>
  3.         <artifactId>py4j</artifactId>
  4.         <version>0.10.7</version>
  5.       </dependency>

  1. public class PythonServer {
  3.       /**
  4.        * python 网关服务
  5.        */
  6.       private GatewayServer gatewayServer;
  8.       /**
  9.        * python 服务启动端口
  10.        */
  11.       private int port = GatewayServer.DEFAULT_PORT;
  13.       /**
  14.        * 缓存当前几个 python reader
  15.        */
  16.       private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
  18.       public PythonServer() {
  19.           for (int i = 0; i < 10; i++) {
  20.               if (gatewayServer == null) {
  21.                   try {
  22.                       gatewayServer = new GatewayServer(new EntryPoint(), port);
  23.                       gatewayServer.start();
  24.                       log.info("Python 网关启动成功,端口:" + port);
  25.                   } catch (Exception e) {
  26.                       log.warn("Python 网关启动失败,端口:" + port);
  27.                       port += 2;//每次加2
  28.                       log.warn("Python 网关尝试下一个启动端口:" + port);
  29.                   }
  30.               }
  31.           }
  32.           if (gatewayServer == null) {
  33.               log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
  34.               System.exit(-1);
  35.           }
  36.       }
  38.       /**
  39.        * 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
  40.        * @param uuid
  41.        * @param pythonComd
  42.        */
  43.       public void openSession(String uuid, PythonComd pythonComd) {
  44.           pythonComdMap.put(uuid, pythonComd);
  45.       }
  47.       /**
  48.        * 关闭一个 java to python 的会话
  49.        * @param uuid
  50.        */
  51.       public void closeSession(String uuid) {
  52.           pythonComdMap.remove(uuid);
  53.       }
  55.       /**
  56.        * 用于 python 与 java GatewayServer 通信的端口号
  57.        * @return
  58.        */
  59.       public int getPort() {
  60.           return port;
  61.       }
  63.       /**
  64.        * 当 java 进程关闭时,结束 python 网关。
  65.        */
  66.       @PreDestroy
  67.       public void shutdown() {
  68.           gatewayServer.shutdown();
  69.       }
  71.       class EntryPoint {
  73.           /**
  74.            * 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
  75.            * 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
  76.            * @param uuid
  77.            * @return
  78.            */
  79.           public String getPreDataJson(String uuid) {
  80.               PythonComd pythonComd = pythonComdMap.get(uuid);
  81.               if (pythonComd == null) {
  82.                   return null;
  83.               }
  84.               Dataset<Row> preDataset = pythonComd.getPreDateset();
  85.               if (preDataset == null) {
  86.                   return null;
  87.               }
  88.               try {
  89.                   SparkContext sparkContext = preDataset.sparkSession().sparkContext();
  90.                   PythonConf conf = pythonComd.getConf();
  91.                   String taskId = conf.getTaskId();
  92.                   String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
  93.                   sparkContext.setJobGroup(uuid, description, true);
  94.                   sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
  96.                   List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
  97.                   return JsonUtil.objToJson(list);
  98.               } catch (GplException e) {
  99.                   log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
  100.               }
  101.               return null;
  102.           }
  106.           /**
  107.            * 接收 python 返回的 json。
  108.            * @param uuid
  109.            * @param json
  110.            */
  111.           public void setDataJson(String uuid, String json) {
  112.               PythonComd pythonComd = pythonComdMap.get(uuid);
  113.               if (pythonComd == null) {
  114.                   return;
  115.               }
  116.               if (json == null || json.isEmpty()) {
  117.                   PythonConf conf = pythonComd.getConf();
  118.                   log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
  119.                   return;
  120.               }
  121.               List<String> list = new ArrayList<>();
  122.               list.add(json);
  123.               SparkSession sparkSession = BeanFactory.getSparkSession();
  124.               Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
  125.               Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
  126.               pythonComd.setReturnResult(dataset);
  127.           }
  128.       }
  129.   }

  1. public class PythonComd extends BaseComd<PythonConf> {
  3.       /**
  4.        * 单例,用于和 python 进程通信。
  5.        */
  6.       @Resource
  7.       private PythonServer pythonServer;
  9.       /**
  10.        * python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
  11.        * execSparkDataset() 方法,返回 this.returnResult;
  12.        */
  13.       @Setter
  14.       private Dataset<Row> returnResult;
  16.       /**
  17.        * 输出流字符串。
  18.        */
  19.       private StringBuilder outputStringBuilder = new StringBuilder();
  21.       /**
  22.        * 错误流字符串。
  23.        */
  24.       private StringBuilder errorStringBuilder = new StringBuilder();
  26.       /**
  27.        * 构造方法,传入配置类。
  28.        * @param conf
  29.        */
  30.       public PythonComd(PythonConf conf) {
  31.           super(conf);
  32.       }
  35.       /**
  36.        * 以命令行的方式,执行 python 进程。
  37.        * @param preDataset 上一条命令生成的结果集对象
  38.        * @return
  39.        * @throws GplException
  40.        */
  41.       @Override
  42.       public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
  43.           if (preDataset == null) {
  44.               throw new GplException("python 命令,未找到结果集!");
  45.           }
  46.           String script = conf.getExpression();
  47.           String fileName = conf.getFilename();
  48.           if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
  49.               throw new GplException("python 命令,脚本为空!");
  50.           }
  51.           if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
  52.               File pyFile = new File(NConst.PYTHON_PATH + fileName);
  53.               if (!pyFile.exists()) {
  54.                   throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
  55.               }
  56.           }
  57.           if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
  58.               File file = new File(AppConfig.PYTHON_ENV);
  59.               if (!file.exists()) {
  60.                   throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
  61.               }
  62.           }
  64.           String uuid = conf.getUuid();
  65.           pythonServer.openSession(uuid, this);
  66.           String containerName = uuid;
  67.           try {
  68.               String scriptPath = null;
  69.               if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
  70.                   String md5 = DigestUtils.md5Hex(script);
  71.                   String tmpName = md5 + ".py";
  72.                   scriptPath = NConst.PYTHON_PATH + tmpName;
  73.                   File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
  74.                   FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
  75.               } else {
  76.                   scriptPath = NConst.PYTHON_PATH + fileName;
  77.               }
  79.               List<String> command = new ArrayList<>();
  80.               if (!StringUtils.isEmpty(conf.getDocker())) {
  81.                   // docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
  82.                   command.add("docker");
  83.                   command.add("run");
  84.                   command.add("--name");
  85.                   command.add(containerName);//容器名
  86.                   command.add("-e");
  87.                   command.add("TZ=Asia/Shanghai");//设置时区
  88.                   command.add("-v");
  89.                   command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
  90.                   command.add("--net");
  91.                   command.add("host");
  92.                   command.add(conf.getDocker());// docker 镜像名
  93.               }
  95.               if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
  96.                   command.add("python");
  97.               } else {
  98.                   command.add(AppConfig.PYTHON_ENV);
  99.               }
  100.               command.add(scriptPath);//py文件路径
  101.               command.add(String.valueOf(pythonServer.getPort()));//端口号
  102.               command.add(uuid);
  103.               String cmd = String.join(" ", command);
  104.               log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
  105.               ProcessBuilder processBuilder = new ProcessBuilder(command);
  106.               Process process = processBuilder.start();
  107.               this.printProcessStream(process);
  108.               int timeout = conf.getTimeout();
  109.               log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
  110.               if (timeout > 0) {
  111.                   boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
  112.                   if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
  113.                       process.destroy();
  114.                   }
  115.               } else {
  116.                   process.waitFor();
  117.               }
  118.               this.clearTmpFile();//最后执行一下清理目录
  119.           } catch (Exception e) {
  120.               log.error(e.getMessage(), e);
  121.           } finally {
  122.               if (!StringUtils.isEmpty(conf.getDocker())) {
  123.                   try {
  124.                       Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
  125.                       this.printProcessStream(process);
  126.                       process.waitFor();
  127.                       log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
  128.                   } catch (Exception e) {
  129.                       log.error(e.getMessage(), e);
  130.                   }
  131.               }
  132.               pythonServer.closeSession(uuid);
  133.           }
  134.           if (errorStringBuilder.length() > 0) {
  135.               String error = errorStringBuilder.toString();
  136.               throw new GplException("python 命令,脚本执行异常!", error);
  137.           }
  138.           if (outputStringBuilder.length() > 0) {
  139.               String input = outputStringBuilder.toString();
  140.               comdTask.addWarnMsg("python 命令,控制台打印:" + input);
  141.           }
  142.           if (returnResult == null) {
  143.               throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
  144.           }
  145.           // 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
  146.           Map<String, String> fieldMap = new HashMap<>();
  147.           StructType structType = returnResult.schema();
  148.           StructField[] structFieldArray = structType.fields();
  149.           for (StructField structField : structFieldArray) {
  150.               fieldMap.put(structField.name(), structField.dataType().simpleString());
  151.           }
  152.           log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
  153.           return this.returnResult;
  154.       }
  156.       /**
  157.        * 清理生成的临时文件
  158.        */
  159.       private void clearTmpFile() {
  160.           File pyPath = new File(NConst.PYTHON_PATH);
  161.           File[] fileArray = pyPath.listFiles(f -> {
  162.               return f.getName().length() == 35//文件名长度 35 的是临时文件
  163.                       && System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
  164.           });
  165.           for (File file : fileArray) {
  166.               file.delete();
  167.           }
  168.       }
  170.       /**
  171.        * 启动两个后台线程,用于接收 python 进程的输出流。
  172.        * @param process
  173.        */
  174.       private void printProcessStream(Process process) {
  175.           Thread inputThread = new Thread(() -> {
  176.               try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
  177.                   String input = null;
  178.                   while ((input = inputReader.readLine()) != null) {
  179.                       outputStringBuilder.append("\n");
  180.                       outputStringBuilder.append(input);
  181.                   }
  182.                   if (outputStringBuilder.length() > 0) {
  183.                       log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
  184.                   }
  185.               } catch (Exception e) {
  186.                   log.error(e.getMessage(), e);
  187.               }
  188.           });
  189.           inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
  190.           inputThread.setDaemon(true);
  191.           inputThread.start();
  193.           Thread errorThread = new Thread(() -> {
  194.               try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
  195.                   String error = null;
  196.                   while ((error = errorReader.readLine()) != null) {
  197.                       errorStringBuilder.append("\n");
  198.                       errorStringBuilder.append(error);
  199.                   }
  200.                   if (errorStringBuilder.length() > 0) {
  201.                       log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
  202.                   }
  203.               } catch (Exception e) {
  204.                   log.error(e.getMessage(), e);
  205.               }
  206.           });
  207.           errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
  208.           errorThread.setDaemon(true);
  209.           errorThread.start();
  210.       }
  211.   }

