51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 898|回复: 1
打印 上一主题 下一主题

[原创] Python和Java之间相互调用是怎么做到的?

[复制链接]
  • TA的每日心情
    无聊
    4 天前
  • 签到天数: 1050 天

    连续签到: 1 天

    [LV.10]测试总司令

    跳转到指定楼层
    1#
    发表于 2022-12-19 15:47:31 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
     需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。
      具体执行流程,java服务执行页面输入的python语句,python语句调用java方法得到结果,python进行自定义处理后再返回给java服务,java服务最终返回具体的结果给客户端。
      语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。

    1. | dbquery sql="select id,num,create_time from security_log"
    2.   | python
    3.   import sys
    4.   from py4j.java_gateway import JavaGateway, GatewayParameters
    5.   gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv[1])))
    6.   SESSION_ID = sys.argv[2]
    7.   
    8.   # 获取java 实例类
    9.   entry= gateway.entry_point
    10.   # 调用 java具体方法,java返回具体的json
    11.   preDataJson = entry.getPreDateJson(SESSION_ID)
    12.   
    13.   # 以下开始处理java返回的结果json
    14.   import pandas as pd
    15.   df = pd.read_json(preDataJson)
    16.   rtn_data = df[:2].to_json(orient='records')
    17.   # 处理完成的rtn_data 再设置给java 实例
    18.   entry.setDatasetJson(SESSION_ID,rtn_data )
    复制代码


    首先java服务中引用依赖。
    1.    <dependency>
    2.         <groupId>net.sf.py4j</groupId>
    3.         <artifactId>py4j</artifactId>
    4.         <version>0.10.7</version>
    5.       </dependency>
    复制代码


    首先java端定义使用pythonServer网关入口。
    1. public class PythonServer {
    2.   
    3.       /**
    4.        * python 网关服务
    5.        */
    6.       private GatewayServer gatewayServer;
    7.   
    8.       /**
    9.        * python 服务启动端口
    10.        */
    11.       private int port = GatewayServer.DEFAULT_PORT;
    12.   
    13.       /**
    14.        * 缓存当前几个 python reader
    15.        */
    16.       private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
    17.   
    18.       public PythonServer() {
    19.           for (int i = 0; i < 10; i++) {
    20.               if (gatewayServer == null) {
    21.                   try {
    22.                       gatewayServer = new GatewayServer(new EntryPoint(), port);
    23.                       gatewayServer.start();
    24.                       log.info("Python 网关启动成功,端口:" + port);
    25.                   } catch (Exception e) {
    26.                       log.warn("Python 网关启动失败,端口:" + port);
    27.                       port += 2;//每次加2
    28.                       log.warn("Python 网关尝试下一个启动端口:" + port);
    29.                   }
    30.               }
    31.           }
    32.           if (gatewayServer == null) {
    33.               log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
    34.               System.exit(-1);
    35.           }
    36.       }
    37.   
    38.       /**
    39.        * 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
    40.        * @param uuid
    41.        * @param pythonComd
    42.        */
    43.       public void openSession(String uuid, PythonComd pythonComd) {
    44.           pythonComdMap.put(uuid, pythonComd);
    45.       }
    46.   
    47.       /**
    48.        * 关闭一个 java to python 的会话
    49.        * @param uuid
    50.        */
    51.       public void closeSession(String uuid) {
    52.           pythonComdMap.remove(uuid);
    53.       }
    54.   
    55.       /**
    56.        * 用于 python 与 java GatewayServer 通信的端口号
    57.        * @return
    58.        */
    59.       public int getPort() {
    60.           return port;
    61.       }
    62.   
    63.       /**
    64.        * 当 java 进程关闭时,结束 python 网关。
    65.        */
    66.       @PreDestroy
    67.       public void shutdown() {
    68.           gatewayServer.shutdown();
    69.       }
    70.   
    71.       class EntryPoint {
    72.   
    73.           /**
    74.            * 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
    75.            * 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
    76.            * @param uuid
    77.            * @return
    78.            */
    79.           public String getPreDataJson(String uuid) {
    80.               PythonComd pythonComd = pythonComdMap.get(uuid);
    81.               if (pythonComd == null) {
    82.                   return null;
    83.               }
    84.               Dataset<Row> preDataset = pythonComd.getPreDateset();
    85.               if (preDataset == null) {
    86.                   return null;
    87.               }
    88.               try {
    89.                   SparkContext sparkContext = preDataset.sparkSession().sparkContext();
    90.                   PythonConf conf = pythonComd.getConf();
    91.                   String taskId = conf.getTaskId();
    92.                   String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
    93.                   sparkContext.setJobGroup(uuid, description, true);
    94.                   sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
    95.   
    96.                   List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
    97.                   return JsonUtil.objToJson(list);
    98.               } catch (GplException e) {
    99.                   log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
    100.               }
    101.               return null;
    102.           }
    103.   
    104.   
    105.   
    106.           /**
    107.            * 接收 python 返回的 json。
    108.            * @param uuid
    109.            * @param json
    110.            */
    111.           public void setDataJson(String uuid, String json) {
    112.               PythonComd pythonComd = pythonComdMap.get(uuid);
    113.               if (pythonComd == null) {
    114.                   return;
    115.               }
    116.               if (json == null || json.isEmpty()) {
    117.                   PythonConf conf = pythonComd.getConf();
    118.                   log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
    119.                   return;
    120.               }
    121.               List<String> list = new ArrayList<>();
    122.               list.add(json);
    123.               SparkSession sparkSession = BeanFactory.getSparkSession();
    124.               Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
    125.               Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
    126.               pythonComd.setReturnResult(dataset);
    127.           }
    128.       }
    129.   }
    复制代码


    java端执行python代码。
    1. public class PythonComd extends BaseComd<PythonConf> {
    2.   
    3.       /**
    4.        * 单例,用于和 python 进程通信。
    5.        */
    6.       @Resource
    7.       private PythonServer pythonServer;
    8.   
    9.       /**
    10.        * python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
    11.        * execSparkDataset() 方法,返回 this.returnResult;
    12.        */
    13.       @Setter
    14.       private Dataset<Row> returnResult;
    15.   
    16.       /**
    17.        * 输出流字符串。
    18.        */
    19.       private StringBuilder outputStringBuilder = new StringBuilder();
    20.   
    21.       /**
    22.        * 错误流字符串。
    23.        */
    24.       private StringBuilder errorStringBuilder = new StringBuilder();
    25.   
    26.       /**
    27.        * 构造方法,传入配置类。
    28.        * @param conf
    29.        */
    30.       public PythonComd(PythonConf conf) {
    31.           super(conf);
    32.       }
    33.   
    34.   
    35.       /**
    36.        * 以命令行的方式,执行 python 进程。
    37.        * @param preDataset 上一条命令生成的结果集对象
    38.        * @return
    39.        * @throws GplException
    40.        */
    41.       @Override
    42.       public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
    43.           if (preDataset == null) {
    44.               throw new GplException("python 命令,未找到结果集!");
    45.           }
    46.           String script = conf.getExpression();
    47.           String fileName = conf.getFilename();
    48.           if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
    49.               throw new GplException("python 命令,脚本为空!");
    50.           }
    51.           if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
    52.               File pyFile = new File(NConst.PYTHON_PATH + fileName);
    53.               if (!pyFile.exists()) {
    54.                   throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
    55.               }
    56.           }
    57.           if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
    58.               File file = new File(AppConfig.PYTHON_ENV);
    59.               if (!file.exists()) {
    60.                   throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
    61.               }
    62.           }
    63.   
    64.           String uuid = conf.getUuid();
    65.           pythonServer.openSession(uuid, this);
    66.           String containerName = uuid;
    67.           try {
    68.               String scriptPath = null;
    69.               if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
    70.                   String md5 = DigestUtils.md5Hex(script);
    71.                   String tmpName = md5 + ".py";
    72.                   scriptPath = NConst.PYTHON_PATH + tmpName;
    73.                   File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
    74.                   FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
    75.               } else {
    76.                   scriptPath = NConst.PYTHON_PATH + fileName;
    77.               }
    78.   
    79.               List<String> command = new ArrayList<>();
    80.               if (!StringUtils.isEmpty(conf.getDocker())) {
    81.                   // docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
    82.                   command.add("docker");
    83.                   command.add("run");
    84.                   command.add("--name");
    85.                   command.add(containerName);//容器名
    86.                   command.add("-e");
    87.                   command.add("TZ=Asia/Shanghai");//设置时区
    88.                   command.add("-v");
    89.                   command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
    90.                   command.add("--net");
    91.                   command.add("host");
    92.                   command.add(conf.getDocker());// docker 镜像名
    93.               }
    94.   
    95.               if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
    96.                   command.add("python");
    97.               } else {
    98.                   command.add(AppConfig.PYTHON_ENV);
    99.               }
    100.               command.add(scriptPath);//py文件路径
    101.               command.add(String.valueOf(pythonServer.getPort()));//端口号
    102.               command.add(uuid);
    103.               String cmd = String.join(" ", command);
    104.               log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
    105.               ProcessBuilder processBuilder = new ProcessBuilder(command);
    106.               Process process = processBuilder.start();
    107.               this.printProcessStream(process);
    108.               int timeout = conf.getTimeout();
    109.               log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
    110.               if (timeout > 0) {
    111.                   boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
    112.                   if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
    113.                       process.destroy();
    114.                   }
    115.               } else {
    116.                   process.waitFor();
    117.               }
    118.               this.clearTmpFile();//最后执行一下清理目录
    119.           } catch (Exception e) {
    120.               log.error(e.getMessage(), e);
    121.           } finally {
    122.               if (!StringUtils.isEmpty(conf.getDocker())) {
    123.                   try {
    124.                       Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
    125.                       this.printProcessStream(process);
    126.                       process.waitFor();
    127.                       log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
    128.                   } catch (Exception e) {
    129.                       log.error(e.getMessage(), e);
    130.                   }
    131.               }
    132.               pythonServer.closeSession(uuid);
    133.           }
    134.           if (errorStringBuilder.length() > 0) {
    135.               String error = errorStringBuilder.toString();
    136.               throw new GplException("python 命令,脚本执行异常!", error);
    137.           }
    138.           if (outputStringBuilder.length() > 0) {
    139.               String input = outputStringBuilder.toString();
    140.               comdTask.addWarnMsg("python 命令,控制台打印:" + input);
    141.           }
    142.           if (returnResult == null) {
    143.               throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
    144.           }
    145.           // 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
    146.           Map<String, String> fieldMap = new HashMap<>();
    147.           StructType structType = returnResult.schema();
    148.           StructField[] structFieldArray = structType.fields();
    149.           for (StructField structField : structFieldArray) {
    150.               fieldMap.put(structField.name(), structField.dataType().simpleString());
    151.           }
    152.           log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
    153.           return this.returnResult;
    154.       }
    155.   
    156.       /**
    157.        * 清理生成的临时文件
    158.        */
    159.       private void clearTmpFile() {
    160.           File pyPath = new File(NConst.PYTHON_PATH);
    161.           File[] fileArray = pyPath.listFiles(f -> {
    162.               return f.getName().length() == 35//文件名长度 35 的是临时文件
    163.                       && System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
    164.           });
    165.           for (File file : fileArray) {
    166.               file.delete();
    167.           }
    168.       }
    169.   
    170.       /**
    171.        * 启动两个后台线程,用于接收 python 进程的输出流。
    172.        * @param process
    173.        */
    174.       private void printProcessStream(Process process) {
    175.           Thread inputThread = new Thread(() -> {
    176.               try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
    177.                   String input = null;
    178.                   while ((input = inputReader.readLine()) != null) {
    179.                       outputStringBuilder.append("\n");
    180.                       outputStringBuilder.append(input);
    181.                   }
    182.                   if (outputStringBuilder.length() > 0) {
    183.                       log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
    184.                   }
    185.               } catch (Exception e) {
    186.                   log.error(e.getMessage(), e);
    187.               }
    188.           });
    189.           inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
    190.           inputThread.setDaemon(true);
    191.           inputThread.start();
    192.   
    193.           Thread errorThread = new Thread(() -> {
    194.               try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
    195.                   String error = null;
    196.                   while ((error = errorReader.readLine()) != null) {
    197.                       errorStringBuilder.append("\n");
    198.                       errorStringBuilder.append(error);
    199.                   }
    200.                   if (errorStringBuilder.length() > 0) {
    201.                       log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
    202.                   }
    203.               } catch (Exception e) {
    204.                   log.error(e.getMessage(), e);
    205.               }
    206.           });
    207.           errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
    208.           errorThread.setDaemon(true);
    209.           errorThread.start();
    210.       }
    211.   }
    复制代码



    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-11-25 14:56 , Processed in 0.061171 second(s), 22 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表