Создание торрент-приложения с помощью Node с нуля.

Лучший способ научиться чему-то в разработке — это пойти и попробовать создать что-то свое, что бы это ни было. В этой статье я проведу вас через создание минимального примера торрент-приложения с использованием Node JS и библиотеки swenssonp2p.

Настоятельно рекомендую прочитать и прокомментировать мою предыдущую статью о создании библиотеки p2p с нуля, чтобы понять эту.


Итак, Torrent — это P2P-сеть, позволяющая обмениваться файлами между пользователями. Основная идея заключается в том, что один файл может появляться у разных пиров, и, разбивая потоки загрузки на части, пиры могут ускорить загрузку файлов. Сеть P2P используется для обмена мета-информацией о файлах, в то время как для фактической загрузки используется отдельное TCP-соединение непосредственно с файлом.

В этой статье я не буду реализовывать пиявки, но вы можете найти этот код в репозитории позже.

Итак, прежде всего, мне нужно придумать какой-то интерфейс для конечного пользователя, чтобы он мог делиться файлами с помощью этого приложения. Я решил просто индексировать все внутри process.cwd() при запуске приложения.

Для хранения файлов я решил использовать Map, где хэш файла будет ключом. Я также решил, что не хочу, чтобы этот процесс мешал пользователю делать то, что он хочет, и поместил индексацию в асинхронную функцию, которая не ожидается. Реализация hashFile зависит от вас.

const path = require('path');
const { readdir, stat } = require('fs/promises');

const index = new Map();

async function* findFiles (folder) {
  for (let filename of await readdir(folder)) {
    const filepath = path.resolve(folder, filename);
    const filestats = await stat(filepath);

    if (filestats.isDirectory()) {
      yield* findFiles(filepath);
    } else {
      yield { path: filepath, size: filestats.size };
    }
  }
}

;(async () => {
  console.log('Start indexing files...');

  for await (let { path, size } of findFiles(process.cwd())) {
    const [name] = path.split('/').slice(-1);
    const hash = await hashFile(path);

    index.set(hash, { hash, size, name, path });
  }

  console.log(`Directory content indexed, ${index.size} files found`);
})();
Вход в полноэкранный режим Выход из полноэкранного режима

Следующее, что я хочу сделать, это создать P2P сеть. Я использую swenssonp2p и просто вызываю createNode. Это создаст общий узел сети p2p локально, после чего я запускаю listen, чтобы начать принимать соединения.

Я не знаю точно, что я буду делать после запуска, думаю, что должно быть много вещей, поэтому я оставляю вызов эмиттера событий (socket), который я могу заполнить слушателями позже. Чтобы иметь возможность подписаться до вызова обратного вызова слушателя, я откладываю вызов слушателя до завершения всего синхронного кода.

const EventEmitter = require('events');
const createNode = require('swenssonp2p');

const main = new EventEmitter();

const node = createNode();
const port = Number(process.argv[2]);

setTimeout(() => {
  node.listen(port, () => main.emit('startup', port));
}, 0);
Вход в полноэкранный режим Выход из полноэкранного режима

После запуска узла я продолжаю работу и сообщаю пользователю, что он может делать. Я хочу использовать тот же интерфейс (ввод команд в process.stdin), который я использовал в приложении чата, но я не знаю точно, какие команды должны быть, поэтому я оставляю там сокет (на самом деле два).

main.on('startup', (port) => {
  console.log(`Node is up on ${port}.`);
  console.log('');

  main.emit('help');

  process.stdin.on('data', (data) => main.emit('command', data.toString()));
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Первой командой, как и в приложении чата, будет команда connect.

main.on('help', () => {
  console.log('  write "connect IP:PORT" to connect to other nodes on the network.');
});

main.on('command', (text) => {
  if (text.startsWith('connect')) {
    const ipport = text.substr(8);
    const [ip, port] = ipport.split(':');

    console.log(`Connecting to ${ip} at ${Number(port)}...`);
    node.connect(ip, Number(port), () => {
      console.log(`Connection to ${ip} established.`);
    });
  }
});
Вход в полноэкранный режим Выход из полноэкранного режима

Теперь я хочу, чтобы пользователь мог сначала искать файлы. Я реализую поиск только по имени, но вы можете добавить и другие параметры в эту команду. Индекс также не помогает нам искать файлы, но мы будем использовать его позже, обещаю.

main.on('help', () => {
  console.log('  write "search FILENAME" to look for files.');
});

// Once the command arrives, we broadcast the search message on the network
main.on('command', (text) => {
  if (text.startsWith('search')) {
    const searchRequest = text.substr(7).trim();

    console.log(`Searching for file by "${searchRequest}"...`);
    node.broadcast({ type: 'search', meta: searchRequest });
  }
});

// Once we receive this message (on another node), we reply with results
node.on('broadcast', ({ origin, message: { type, meta }}) => {
  if (type === 'search' && origin !== node.id) {
    for (let key of index.keys()) {
      const data = index.get(key);

      if (data.name.toLowerCase().includes(meta.toLowerCase())) {
        node.direct(origin, { type: 'search/response', meta: data });
      }
    }
  }
});

// Once we receive the response from the file holder, we display it
node.on('direct', ({ origin, message: { type, meta: { name, size, hash } }}) => {
  if (type === 'search/response') {
    console.log(`  ${name} ${formatSize(size)} ${hash}`);
  }
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Этот поток в стиле пинг-понга легко реализовать, но он кажется нестабильным, поскольку теоретически мы можем получить search/response, когда поиск не был выполнен, и это все равно вызовет console.log. Я не считаю это проблемой, но проверка безопасности здесь не помешает.

Следующее, что я хочу сделать, это чтобы пользователь мог начать загрузку. Поскольку хэш используется для индекса, мы можем использовать его в качестве параметра команды, что имеет смысл (подобно тому, как вы можете создать magnet-ссылки с хэшами файлов и попросить приложение загрузить их без выполнения поиска).

Я не знаю, что я буду делать, когда начнется загрузка, поэтому я оставляю там сокет.

main.on('help', () => {
  console.log('  write "download HASH" to start downloading file');
});

main.on('command', (text) => {
  if (text.startsWith('download')) {
    const hash = text.substr(9).trim();

    main.emit('download', hash);
  }
});
Вход в полноэкранный режим Выход из полноэкранного режима

Для того чтобы скачать файл, мы должны установить отдельное TCP-соединение с пирами и запросить у них куски данных. Количество кусков и имя файла — это не та информация, которую мы имеем локально, даже если мы могли получить ее через команду поиска, это не гарантировано. Итак, прежде всего, я хочу настроить поток пинг-понга для обмена метаинформацией файла перед началом загрузки. Это будет примерно то же самое, что и поток поиска, но в конце я буду хранить обменную информацию в downloads и выдавать события, когда она изменится.

Как вы можете видеть, информация об обмене также содержит IP-адрес семени, чтобы я мог подключиться к его файловому серверу во время загрузки позже.

const downloads = {};

main.on('download', (hash) => {
  node.broadcast({ type: 'download', meta: hash });
});

node.on('broadcast', ({ origin, message: { type, meta } }) => {
  if (type === 'download' && origin !== node.id) {
    const data = index.get(meta);

    if (!!data) {
      node.direct(origin, { type: 'download/response', meta: { ip: Array.from(node.addresses)[0], hash: data.hash, size: data.size, name: data.name } })
    }
  }
});

node.on('direct', ({ origin, message: { type, meta } }) => {
  if (type === 'download/response') {
    if (!downloads[meta.hash]) {
      downloads[meta.hash] = {
        hash,
        name: meta.name,
        size: meta.size,
        seeds: [meta.ip],
        chunks: [],
      };

      main.emit('download/ready', meta.hash);
    } else {
      downloads[meta.hash].seeds.push(meta.ip);
      main.emit('download/update', meta.hash);
    }
  }
});
Вход в полноэкранный режим Выход из полноэкранного режима

Итак, пришло время создать TCP-сервер, который будет реагировать на запросы файловых данных и отправлять данные. Мы будем обмениваться данными по частям, поэтому файловому серверу нужно будет реагировать только на один определенный тип сообщения и отправлять один тип сообщения обратно.

const FILES_SERVER_PORT = 9019;
const CHUNK_SIZE = 512;

const filesServer = net.createServer((socket) => {
  socket.on('data', (data) => {
    const { hash, offset } = JSON.parse(data);
    const meta = index.get(hash);

    const chunk = Buffer.alloc(CHUNK_SIZE);
    const file = await open(meta.path, 'r');

    await file.read(chunk, 0, CHUNK_SIZE, offset * CHUNK_SIZE);
    await file.close();

    socket.write(JSON.stringify({ hash, offset, chunk }));
  });
}).listen(FILES_SERVER_PORT);
Вход в полноэкранный режим Выход из полноэкранного режима

Хорошо, теперь пришло время реализовать фактическую загрузку. Я начну с реакции на событие download/ready и сделаю асинхронный цикл, который будет получать чанки из семян параллельно, по одному чанку от одного семени за раз, но вы, безусловно, можете это подправить.

Чтобы отслеживать состояние чанка, я заполняю поле chunks метаинформации его статусом и сокетом, с которого он загружает данные.

main.on('download/ready', async (hash) => {
  downloads[hash].chunks = [...new Array(Math.ceil(downloads[hash].size / CHUNK_SIZE))].map(() => ({ state: 0 }));
});
Вход в полноэкранный режим Выход из полноэкранного режима

Кроме того, мне нужен временный файл для сохранения загрузки, давайте назначим его и создадим для него файловый хэндл.

downloads[hash].path = path.resolve(DOWNLOADS_PATH, `${hash}.download`);

const file = await open(downloads[hash].path, 'w');
Вход в полноэкранный режим Выход из полноэкранного режима

Теперь мне нужно подключиться к IP-адресам, указанным в downloads Я знаю, что после срабатывания события download/ready некоторые уже есть, но мне также нужно реагировать на события download/update для обновления списка. Я прикрепляю слушателя к этому событию и отсоединяю его, когда загрузка завершена.

const sockets = {};

const updateSocketsList = async ($hash) => {
  if ($hash !== hash) {
    return;
  }

  for (let ip of downloads[hash].seeds) {
    if (!sockets[ip]) {
      const socket = new net.Socket();

      socket.connect(FILES_SERVER_PORT, ip, () => {
        sockets[ip] = { socket, busy: false };
      });
    }
  }
};

updateSocketsList(hash);

main.on('download/update', updateSocketsList);

// ... TODO

main.off('download/update', updateSocketsList);
Вход в полноэкранный режим Выход из полноэкранного режима

Основной цикл довольно прост, я ищу доступный чанк (состояние чанка 0 готов, 1 загружается и 2 уже загружен) для загрузки и сокет, который не занят. Если нет сокета (значит, все они заняты) или нет чанка (значит, все они загружаются), я просто продолжаю после 50 мс задержки. Если доступны и чанк, и сокет, я загружаю, но не жду окончания загрузки.

while (!!downloads[hash].chunks.find((chunk) => chunk.state !== 2)) {
  const availableChunkIndex = downloads[hash].chunks.findIndex((chunk) => chunk.state === 0);
  const availableSocket = Object.values(sockets).find(({ busy }) => !busy);

  if (!availableSocket || !availableChunkIndex) {
    await new Promise((resolve) => setTimeout(() => resolve(), 50));
    continue;
  }

  availableSocket.busy = true;
  downloads[hash].chunks[availableChunkIndex].state = 1;

  ;(async () => {
    const chunk = await downloadChunk(availableSocket.socket, hash, availableChunkIndex);

    await file.write(Buffer.from(chunk), 0, CHUNK_SIZE, availableChunkIndex * CHUNK_SIZE);

    downloads[hash].chunks[availableChunkIndex].state = 2;
    availableSocket.busy = false;
  })();
}
Вход в полноэкранный режим Выход из полноэкранного режима

Как вы видите, мне нужно реализовать только функцию downloadChunk, которая будет фактически захватывать данные из сокета. Я хочу, чтобы это была асинхронная функция, в то время как сокет является эмиттером событий, поэтому мне нужно сделать следующее:

const downloadChunk = (socket, hash, offset) => new Promise((resolve) => {
  socket.write(JSON.stringify({ hash, offset }));

  const listener = (message) => {
    if (hash === message.hash && offset === message.offset) {
      resolve(message.chunk);
      socket.off('data', listener);
    }
  };

  socket.on('data', listener);
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь мне остается только очистить файл, закрыв хэндл файла, переименовав временный файл в то имя, которое он должен иметь, удалив слушателей download/update и закрыв сокеты с семенами.

await file.close();
await rename(downloads[hash].path, path.resolve(DOWNLOADS_PATH, downloads[hash].name));

main.off('download/update', updateSocketsList);

for (let { socket } of Object.values(sockets)) {
  socket.destroy();
}
Вход в полноэкранный режим Выход из полноэкранного режима

Вот так с помощью Node и swenssonp2p можно сделать простейшее Torrent-приложение менее чем за 300 строк кода. Полный код этого приложения можно найти здесь.

Оставьте комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *