montón de nodos agotado al canalizar datos JSONStream.parsed () a través de es.map () y JSONStream.stringify () a la secuencia de archivos

Estoy tratando de canalizar un flujo de entrada (creado a partir de un enorme archivo GeoJSON) a través de JSONStream.parse () para dividir el flujo en objetos, luego a través de event-stream.map () para que pueda transformar el objeto, luego a través de JSONStream .stringify () para crear una cadena a partir de ella, y finalmente a una secuencia de salida grabable. A medida que se ejecuta el proceso, puedo ver que la huella de memoria del nodo continúa creciendo hasta que finalmente agota el montón. Aquí está el script más simple (test.js) que recrea el problema:

const fs = require("fs") const es = require("event-stream") const js = require("JSONStream") out = fs.createWriteStream("/dev/null") process.stdin .pipe(js.parse("features.*")) .pipe(es.map( function(data, cb) { cb(null, data); return; } )) .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}")) .pipe(out) 

Una pequeña secuencia de comandos de bash (barf.sh) que arroja un flujo interminable de JSON en process.stdin del nodo hará que el montón del nodo crezca gradualmente:

 #!/bin/bash echo '{"type":"FeatureCollection","features":[' while : do echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },' done 

ejecutándolo como tal:

 barf.sh | node test.js 

Hay un par de formas curiosas para eludir el problema:

  • Elimine fs.createWriteStream () y cambie la última etapa de canalización de “.pipe (out)” a “.pipe (process.stdout)” y luego canalice la salida estándar del nodo a / dev / null
  • Cambie el es.map () asíncrono al es.mapSync síncrono ()

Cualquiera de las dos acciones anteriores permitirá que el script se ejecute para siempre, con una huella de memoria del nodo baja e invariable. Estoy usando node v6.3.1, event-stream v3.3.4 y JSONStream 1.1.4 en una máquina de ocho núcleos con 8 GB de RAM que ejecuta Ubuntu 16.04.

Espero que alguien pueda ayudarme a corregir lo que estoy seguro de que es un error obvio de mi parte.

JSONStream no es un flujo de streams2 , por lo que no admite la contrapresión. (Hay un breve resumen sobre streams2 aquí .)

Eso significa que los datos saldrán del flujo de parse en data eventos de data y que el flujo los seguirá bombeando independientemente de si el flujo de consumo está listo para ellos. Si hay alguna discrepancia en algún punto de la tubería entre lo rápido que se puede leer y escribir algo, habrá un búfer, que es lo que está viendo.

El arnés de barf.sh ve características bombeadas a través de la stdin . Si, en cambio, estaba leyendo un archivo masivo, debería poder administrar el flujo deteniendo la secuencia de lectura del archivo. Por lo tanto, si insertara alguna lógica de pause/resume en su callback del map , debería poder procesarla en un archivo masivo; Sólo llevaría un poco más de tiempo. Experimentaría con algo como esto:

 let in = fs.createReadStream("/some/massive/file"); let out = fs.createWriteStream("/dev/null"); in .pipe(js.parse("features.*")) .pipe(es.map(function(data, cb) { // This is just an example; a 10-millisecond wait per feature would be very slow. if (!in.isPaused()) { in.pause(); global.setTimeout(function () { in.resume(); }, 10); } cb(null, data); return; })) .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}")) .pipe(out); 

Por cierto, el uso de mapSync hace poca o ninguna diferencia en mi computadora (que es antigua y lenta). Sin embargo, a menos que tenga alguna operación asincrónica para realizar en el map , iría con mapSync .

    Intereting Posts