Node FeedParser & Transform streams
Latest update:
FeedParser is
itself a Transform stream that operates in object mode. Nevertheless,
in the majority of examples it appears at the end of a pipeline,
e.g.:
let fp = new FeedParser()
fp.on('readable', () => {
// get the data
})
my_readable_stream.pipe(fp)
Say we want to get first 2 headlines from an rss:
$ curl -s https://emacsel.com/mp3.xml | node headlines1.js | head -2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell
Let's use FeedParser as god hath intended it as a
transform stream that reads the rss from the stdin & writes the
articles to another transform stream that grabs only the headlines &,
in turn, pipes them to the stdout:
$ cat headlines1.js
let Transform = require('stream').Transform
let FeedParser = require('feedparser')
class Filter extends Transform {
constructor() {
super()
this._writableState.objectMode = true // we can eat objects
}
_transform(input, encoding, done) {
this.push(input.title + '\n')
done()
}
}
process.stdin.pipe(new FeedParser()).pipe(new Filter()).pipe(process.stdout)
(Type npm i feedparser before running the example.)
This works, although it throws an EPIPE error, because head
command abruptly closes the stdout descriptor, while the script tries
to write into it.
You may add smthg like
process.stdout.on('error', (e) => e.code !== 'EPIPE' && console.error(e))
to silence it, but it's better to use pump module (npm i pump) &
catch all errors from all the streams. There's a chance that the
module will be added to
the node core, so get used to it already.
$ diff -u1 headlines1.js headlines3.js | tail -n+4
let FeedParser = require('feedparser')
+let pump = require('pump')
@@ -14,2 +15,3 @@
-process.stdin.pipe(new FeedParser()).pipe(new Filter()).pipe(process.stdout)
+pump(process.stdin, new FeedParser(), new Filter(), process.stdout,
+ err => err && err.code !== 'EPIPE' && console.error(err))
Now, what if we want to control the exact number of articles our
Filter stream receives? I.e., if an rss is many MBs long & we want
only n articles from it? First, we add a CL parameter to our script:
$ cat headlines4.js
let Transform = require('stream').Transform
let FeedParser = require('feedparser')
let pump = require('pump')
class Filter extends Transform {
constructor(articles_max) {
super()
this._writableState.objectMode = true // we can eat objects
this.articles_max = articles_max
this.articles_count = 0
}
_transform(input, encoding, done) {
if (this.articles_count++ < this.articles_max) {
this.push(input.title + '\n')
} else {
console.error('ignore', this.articles_count)
}
done()
}
}
let articles_max = Number(process.argv.slice(2)) || 1
pump(process.stdin, new FeedParser(), new Filter(articles_max), process.stdout,
err => err && err.code !== 'EPIPE' && console.error(err))
Although this works too, it still downloads & parses the articles we don't
want:
$ curl -s https://emacsel.com/mp3.xml | node headlines4.js 2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell
ignore 3
ignore 4
ignore 5
ignore 6
ignore 7
Unfortunately to be able to 'unpipe' a readable stream (from the
Filter standpoint it's the FeedParser instance) we have to
have a ref to it, & I don't know a way to get such a ref from a
Transform stream within, except via explicitly passing a pointer:
$ diff -u headlines4.js headlines5.js | tail -n+4
let pump = require('pump')
class Filter extends Transform {
- constructor(articles_max) {
+ constructor(articles_max, feedparser) {
super()
this._writableState.objectMode = true // we can eat objects
this.articles_max = articles_max
this.articles_count = 0
+
+ if (feedparser) {
+ this.once('unpipe', () => {
+ this.end() // ensure 'finish' event gets emited
+ })
+ }
+ this.feedparser = feedparser
}
_transform(input, encoding, done) {
if (this.articles_count++ < this.articles_max) {
this.push(input.title + '\n')
} else {
- console.error('ignore', this.articles_count)
+ console.error('stop on', this.articles_count)
+ if (this.feedparser) this.feedparser.unpipe(this)
}
done()
}
}
let articles_max = Number(process.argv.slice(2)) || 1
-pump(process.stdin, new FeedParser(), new Filter(articles_max), process.stdout,
+let fp = new FeedParser()
+pump(process.stdin, fp, new Filter(articles_max, fp), process.stdout,
err => err && err.code !== 'EPIPE' && console.error(err))
Test:
$ curl -s https://emacsel.com/mp3.xml | node headlines5.js 2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell
stop on 3
Grab the gist w/ a final version here.
Tags: ойті
Authors: ag