1.5.23. fejezet, STDIO

Poll stdout

# invoke process
process = subprocess.Popen(shlex.split(command),shell=False,stdout=process.PIPE)
 
# Poll process.stdout to show stdout live
while True:
  output = process.stdout.readline()
  if process.poll() is not None:
    break
  if output:
    print output.strip()
rc = process.poll()

Non-blocking IO

Python subprocess non-blocking and non-breaking communicate

    args = shlex.split('mongosh localhost:27017')
    request="db = db.getSiblingDB('populations')"
    process = pexpect.spawn(args[0], args[1:], timeout=5, encoding='utf-8')
    process.sendline(f"{request}\n")
    def read_all(process):
        buff = StringIO()
        while not process.terminated:
            try:
                result = process.read_nonblocking(1, timeout=2)
                if len(result) > 0:
                    buff.write(result)
            except pexpect.exceptions.TIMEOUT:
                break
        return buff