Лучший способ научиться чему-то в разработке — это пойти и попробовать создать что-то свое, что бы это ни было. В этой статье я проведу вас через создание минимального примера торрент-приложения с использованием Node JS и библиотеки swenssonp2p.
Настоятельно рекомендую прочитать и прокомментировать мою предыдущую статью о создании библиотеки p2p с нуля, чтобы понять эту.
Итак, Torrent — это P2P-сеть, позволяющая обмениваться файлами между пользователями. Основная идея заключается в том, что один файл может появляться у разных пиров, и, разбивая потоки загрузки на части, пиры могут ускорить загрузку файлов. Сеть P2P используется для обмена мета-информацией о файлах, в то время как для фактической загрузки используется отдельное TCP-соединение непосредственно с файлом.
В этой статье я не буду реализовывать пиявки, но вы можете найти этот код в репозитории позже.
Итак, прежде всего, мне нужно придумать какой-то интерфейс для конечного пользователя, чтобы он мог делиться файлами с помощью этого приложения. Я решил просто индексировать все внутри process.cwd()
при запуске приложения.
Для хранения файлов я решил использовать Map, где хэш файла будет ключом. Я также решил, что не хочу, чтобы этот процесс мешал пользователю делать то, что он хочет, и поместил индексацию в асинхронную функцию, которая не ожидается. Реализация hashFile
зависит от вас.
const path = require('path');
const { readdir, stat } = require('fs/promises');
const index = new Map();
async function* findFiles (folder) {
for (let filename of await readdir(folder)) {
const filepath = path.resolve(folder, filename);
const filestats = await stat(filepath);
if (filestats.isDirectory()) {
yield* findFiles(filepath);
} else {
yield { path: filepath, size: filestats.size };
}
}
}
;(async () => {
console.log('Start indexing files...');
for await (let { path, size } of findFiles(process.cwd())) {
const [name] = path.split('/').slice(-1);
const hash = await hashFile(path);
index.set(hash, { hash, size, name, path });
}
console.log(`Directory content indexed, ${index.size} files found`);
})();
Следующее, что я хочу сделать, это создать P2P сеть. Я использую swenssonp2p
и просто вызываю createNode
. Это создаст общий узел сети p2p локально, после чего я запускаю listen
, чтобы начать принимать соединения.
Я не знаю точно, что я буду делать после запуска, думаю, что должно быть много вещей, поэтому я оставляю вызов эмиттера событий (socket), который я могу заполнить слушателями позже. Чтобы иметь возможность подписаться до вызова обратного вызова слушателя, я откладываю вызов слушателя до завершения всего синхронного кода.
const EventEmitter = require('events');
const createNode = require('swenssonp2p');
const main = new EventEmitter();
const node = createNode();
const port = Number(process.argv[2]);
setTimeout(() => {
node.listen(port, () => main.emit('startup', port));
}, 0);
После запуска узла я продолжаю работу и сообщаю пользователю, что он может делать. Я хочу использовать тот же интерфейс (ввод команд в process.stdin), который я использовал в приложении чата, но я не знаю точно, какие команды должны быть, поэтому я оставляю там сокет (на самом деле два).
main.on('startup', (port) => {
console.log(`Node is up on ${port}.`);
console.log('');
main.emit('help');
process.stdin.on('data', (data) => main.emit('command', data.toString()));
});
Первой командой, как и в приложении чата, будет команда connect
.
main.on('help', () => {
console.log(' write "connect IP:PORT" to connect to other nodes on the network.');
});
main.on('command', (text) => {
if (text.startsWith('connect')) {
const ipport = text.substr(8);
const [ip, port] = ipport.split(':');
console.log(`Connecting to ${ip} at ${Number(port)}...`);
node.connect(ip, Number(port), () => {
console.log(`Connection to ${ip} established.`);
});
}
});
Теперь я хочу, чтобы пользователь мог сначала искать файлы. Я реализую поиск только по имени, но вы можете добавить и другие параметры в эту команду. Индекс также не помогает нам искать файлы, но мы будем использовать его позже, обещаю.
main.on('help', () => {
console.log(' write "search FILENAME" to look for files.');
});
// Once the command arrives, we broadcast the search message on the network
main.on('command', (text) => {
if (text.startsWith('search')) {
const searchRequest = text.substr(7).trim();
console.log(`Searching for file by "${searchRequest}"...`);
node.broadcast({ type: 'search', meta: searchRequest });
}
});
// Once we receive this message (on another node), we reply with results
node.on('broadcast', ({ origin, message: { type, meta }}) => {
if (type === 'search' && origin !== node.id) {
for (let key of index.keys()) {
const data = index.get(key);
if (data.name.toLowerCase().includes(meta.toLowerCase())) {
node.direct(origin, { type: 'search/response', meta: data });
}
}
}
});
// Once we receive the response from the file holder, we display it
node.on('direct', ({ origin, message: { type, meta: { name, size, hash } }}) => {
if (type === 'search/response') {
console.log(` ${name} ${formatSize(size)} ${hash}`);
}
});
Этот поток в стиле пинг-понга легко реализовать, но он кажется нестабильным, поскольку теоретически мы можем получить search/response
, когда поиск не был выполнен, и это все равно вызовет console.log. Я не считаю это проблемой, но проверка безопасности здесь не помешает.
Следующее, что я хочу сделать, это чтобы пользователь мог начать загрузку. Поскольку хэш используется для индекса, мы можем использовать его в качестве параметра команды, что имеет смысл (подобно тому, как вы можете создать magnet-ссылки с хэшами файлов и попросить приложение загрузить их без выполнения поиска).
Я не знаю, что я буду делать, когда начнется загрузка, поэтому я оставляю там сокет.
main.on('help', () => {
console.log(' write "download HASH" to start downloading file');
});
main.on('command', (text) => {
if (text.startsWith('download')) {
const hash = text.substr(9).trim();
main.emit('download', hash);
}
});
Для того чтобы скачать файл, мы должны установить отдельное TCP-соединение с пирами и запросить у них куски данных. Количество кусков и имя файла — это не та информация, которую мы имеем локально, даже если мы могли получить ее через команду поиска, это не гарантировано. Итак, прежде всего, я хочу настроить поток пинг-понга для обмена метаинформацией файла перед началом загрузки. Это будет примерно то же самое, что и поток поиска, но в конце я буду хранить обменную информацию в downloads
и выдавать события, когда она изменится.
Как вы можете видеть, информация об обмене также содержит IP-адрес семени, чтобы я мог подключиться к его файловому серверу во время загрузки позже.
const downloads = {};
main.on('download', (hash) => {
node.broadcast({ type: 'download', meta: hash });
});
node.on('broadcast', ({ origin, message: { type, meta } }) => {
if (type === 'download' && origin !== node.id) {
const data = index.get(meta);
if (!!data) {
node.direct(origin, { type: 'download/response', meta: { ip: Array.from(node.addresses)[0], hash: data.hash, size: data.size, name: data.name } })
}
}
});
node.on('direct', ({ origin, message: { type, meta } }) => {
if (type === 'download/response') {
if (!downloads[meta.hash]) {
downloads[meta.hash] = {
hash,
name: meta.name,
size: meta.size,
seeds: [meta.ip],
chunks: [],
};
main.emit('download/ready', meta.hash);
} else {
downloads[meta.hash].seeds.push(meta.ip);
main.emit('download/update', meta.hash);
}
}
});
Итак, пришло время создать TCP-сервер, который будет реагировать на запросы файловых данных и отправлять данные. Мы будем обмениваться данными по частям, поэтому файловому серверу нужно будет реагировать только на один определенный тип сообщения и отправлять один тип сообщения обратно.
const FILES_SERVER_PORT = 9019;
const CHUNK_SIZE = 512;
const filesServer = net.createServer((socket) => {
socket.on('data', (data) => {
const { hash, offset } = JSON.parse(data);
const meta = index.get(hash);
const chunk = Buffer.alloc(CHUNK_SIZE);
const file = await open(meta.path, 'r');
await file.read(chunk, 0, CHUNK_SIZE, offset * CHUNK_SIZE);
await file.close();
socket.write(JSON.stringify({ hash, offset, chunk }));
});
}).listen(FILES_SERVER_PORT);
Хорошо, теперь пришло время реализовать фактическую загрузку. Я начну с реакции на событие download/ready
и сделаю асинхронный цикл, который будет получать чанки из семян параллельно, по одному чанку от одного семени за раз, но вы, безусловно, можете это подправить.
Чтобы отслеживать состояние чанка, я заполняю поле chunks
метаинформации его статусом и сокетом, с которого он загружает данные.
main.on('download/ready', async (hash) => {
downloads[hash].chunks = [...new Array(Math.ceil(downloads[hash].size / CHUNK_SIZE))].map(() => ({ state: 0 }));
});
Кроме того, мне нужен временный файл для сохранения загрузки, давайте назначим его и создадим для него файловый хэндл.
downloads[hash].path = path.resolve(DOWNLOADS_PATH, `${hash}.download`);
const file = await open(downloads[hash].path, 'w');
Теперь мне нужно подключиться к IP-адресам, указанным в downloads
Я знаю, что после срабатывания события download/ready
некоторые уже есть, но мне также нужно реагировать на события download/update
для обновления списка. Я прикрепляю слушателя к этому событию и отсоединяю его, когда загрузка завершена.
const sockets = {};
const updateSocketsList = async ($hash) => {
if ($hash !== hash) {
return;
}
for (let ip of downloads[hash].seeds) {
if (!sockets[ip]) {
const socket = new net.Socket();
socket.connect(FILES_SERVER_PORT, ip, () => {
sockets[ip] = { socket, busy: false };
});
}
}
};
updateSocketsList(hash);
main.on('download/update', updateSocketsList);
// ... TODO
main.off('download/update', updateSocketsList);
Основной цикл довольно прост, я ищу доступный чанк (состояние чанка 0
готов, 1
загружается и 2
уже загружен) для загрузки и сокет, который не занят. Если нет сокета (значит, все они заняты) или нет чанка (значит, все они загружаются), я просто продолжаю
после 50 мс задержки. Если доступны и чанк, и сокет, я загружаю, но не жду окончания загрузки.
while (!!downloads[hash].chunks.find((chunk) => chunk.state !== 2)) {
const availableChunkIndex = downloads[hash].chunks.findIndex((chunk) => chunk.state === 0);
const availableSocket = Object.values(sockets).find(({ busy }) => !busy);
if (!availableSocket || !availableChunkIndex) {
await new Promise((resolve) => setTimeout(() => resolve(), 50));
continue;
}
availableSocket.busy = true;
downloads[hash].chunks[availableChunkIndex].state = 1;
;(async () => {
const chunk = await downloadChunk(availableSocket.socket, hash, availableChunkIndex);
await file.write(Buffer.from(chunk), 0, CHUNK_SIZE, availableChunkIndex * CHUNK_SIZE);
downloads[hash].chunks[availableChunkIndex].state = 2;
availableSocket.busy = false;
})();
}
Как вы видите, мне нужно реализовать только функцию downloadChunk
, которая будет фактически захватывать данные из сокета. Я хочу, чтобы это была асинхронная функция, в то время как сокет является эмиттером событий, поэтому мне нужно сделать следующее:
const downloadChunk = (socket, hash, offset) => new Promise((resolve) => {
socket.write(JSON.stringify({ hash, offset }));
const listener = (message) => {
if (hash === message.hash && offset === message.offset) {
resolve(message.chunk);
socket.off('data', listener);
}
};
socket.on('data', listener);
});
Теперь мне остается только очистить файл, закрыв хэндл файла, переименовав временный файл в то имя, которое он должен иметь, удалив слушателей download/update
и закрыв сокеты с семенами.
await file.close();
await rename(downloads[hash].path, path.resolve(DOWNLOADS_PATH, downloads[hash].name));
main.off('download/update', updateSocketsList);
for (let { socket } of Object.values(sockets)) {
socket.destroy();
}
Вот так с помощью Node и swenssonp2p можно сделать простейшее Torrent-приложение менее чем за 300 строк кода. Полный код этого приложения можно найти здесь.